if (info.syncstopped) {
call(new RGWRadosRemoveCR(store, obj));
} else {
- status.state = rgw_bucket_shard_sync_info::StateFullSync;
- status.inc_marker.position = info.max_marker;
+ // whether or not to do full sync, incremental sync will follow anyway
+ if (sync_env->sync_module->should_full_sync()) {
+ status.state = rgw_bucket_shard_sync_info::StateFullSync;
+ status.inc_marker.position = info.max_marker;
+ } else {
+ status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+ status.inc_marker.position = "";
+ }
map<string, bufferlist> attrs;
status.encode_all_attrs(attrs);
call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
}
virtual RGWMetadataHandler *alloc_bucket_meta_handler();
virtual RGWMetadataHandler *alloc_bucket_instance_meta_handler();
+
+ // indication whether the sync module start with full sync (default behavior)
+ // incremental sync would follow anyway
+ virtual bool should_full_sync() const {
+ return true;
+ }
};
typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
"uid": <uid>, # default: "pubsub"
"data_bucket_prefix": <prefix> # default: "pubsub-"
"data_oid_prefix": <prefix> #
+ "events_retention_days": <int> # default: 7
+ "start_with_full_sync" <bool> # default: false
# non-dynamic config
"notifications": [
"name": <subscription-name>,
"topic": <topic>,
"push_endpoint": <endpoint>,
- "args:" <arg list>. # any push endpoint specific args (include all args)
+ "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
"data_bucket": <bucket>, # override name of bucket where subscription data will be store
"data_oid_prefix": <prefix> # set prefix for subscription data object ids
"s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
encode_json("name", name, f);
encode_json("topic", topic, f);
encode_json("push_endpoint", push_endpoint_name, f);
- encode_json("args", push_endpoint_args, f);
+ encode_json("push_endpoint_args", push_endpoint_args, f);
encode_json("data_bucket_name", data_bucket_name, f);
encode_json("data_oid_prefix", data_oid_prefix, f);
encode_json("s3_id", s3_id, f);
std::map<std::string, PSSubConfigRef> subs;
std::map<std::string, PSTopicConfigRef> topics;
std::multimap<std::string, PSNotificationConfig> notifications;
+
+ bool start_with_full_sync{false};
void dump(Formatter *f) const {
encode_json("id", id, f);
f->close_section();
}
}
+ encode_json("start_with_full_sync", start_with_full_sync, f);
}
void init(CephContext *cct, const JSONFormattable& config) {
iter->second->subs.insert(sc->name);
}
}
+ start_with_full_sync = config["start_with_full_sync"](false);
ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
}
return new RGWRESTMgr_PubSub_S3(orig);
}
+bool RGWPSSyncModuleInstance::should_full_sync() const {
+ return data_handler->get_conf()->start_with_full_sync;
+}
+
int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
instance->reset(new RGWPSSyncModuleInstance(cct, config));
return 0;
}
+
const JSONFormattable& get_effective_conf() {
return effective_conf;
}
+ // start with full sync based on configuration
+ // default to incremental only
+ virtual bool should_full_sync() const override;
};
#endif
for record in parsed_result['Records']:
log.debug(record)
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
# cleanup
_, status = s3_notification_conf.del_config()
for event in parsed_result['events']:
log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
# delete objects from the bucket
for key in bucket.list():
key.delete()
log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
'" type: "' + str(event['event']) + '"')
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
# get the events from the deletions subscription
result, _ = sub_delete_conf.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
- # TODO: set exact_match to true
- verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
# delete objects from the bucket
for key in bucket.list():
key.delete()
log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
'" type: "' + str(event['event']) + '"')
# deletions should not change the creation events
- # TODO: set exact_match to true
- verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
# get the events from the deletions subscription
result, _ = sub_delete_conf.get_events()
parsed_result = json.loads(result)
log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
'" type: "' + str(event['event']) + '"')
# only deletions should be listed here
- # TODO: set exact_match to true
- verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=True, deletions=True)
# get the events from the all events subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
'" type: "' + str(event['event']) + '"')
# both deletions and creations should be here
- verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False)
+ verify_events_by_elements(parsed_result['events'], keys, exact_match=True, deletions=False)
# verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
# TODO: (1) test deletions (2) test overall number of events
if next_marker == '':
break
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_events_by_elements(all_events, keys, exact_match=False)
+ verify_events_by_elements(all_events, keys, exact_match=True)
# cleanup
sub_conf.del_config()
for event in events:
log.debug('Event (before ack) id: "' + str(event['id']) + '"')
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_events_by_elements(events, keys, exact_match=False)
+ verify_events_by_elements(events, keys, exact_match=True)
# ack half of the events
events_to_ack = number_of_objects/2
for event in events:
topic_name)
result, _ = sub_conf.get_events()
parsed_result = json.loads(result)
- # TODO: set exact_match to true
- verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
# s3 notification is deleted with bucket
_, status = s3_notification_conf.get_config(notification=notification_name)
for record in parsed_result['Records']:
log.debug(record)
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
result, _ = sub_conf2.get_events()
parsed_result = json.loads(result)
for record in parsed_result['Records']:
log.debug(record)
keys = list(bucket.list())
- # TODO: set exact_match to true
- verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
# cleanup
s3_notification_conf.del_config()
class PSZone(Zone): # pylint: disable=too-many-ancestors
""" PubSub zone class """
+ def __init__(self, name, full_sync, retention_days, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ self.full_sync = full_sync
+ self.retention_days = retention_days
+ super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
def is_read_only(self):
return True
def create(self, cluster, args=None, **kwargs):
if args is None:
args = ''
- args += ['--tier-type', self.tier_type()]
+ tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
+ args += ['--tier-type', self.tier_type(), '--tier-config', tier_config]
return self.json_command(cluster, 'create', args)
def has_buckets(self):
""" ack events in a subscription """
parameters = {'ack': None, 'event-id': event_id}
return self.send_request('POST', parameters)
+
+
+class PSZoneConfig:
+ """ pubsub zone configuration """
+ def __init__(self, cfg, section):
+ self.full_sync = cfg.get(section, 'start_with_full_sync')
+ self.retention_days = cfg.get(section, 'retention_days')
from rgw_multi.zone_cloud import CloudZone as CloudZone
from rgw_multi.zone_cloud import CloudZoneConfig as CloudZoneConfig
from rgw_multi.zone_ps import PSZone as PSZone
+from rgw_multi.zone_ps import PSZoneConfig as PSZoneConfig
# make tests from rgw_multi.tests available to nose
from rgw_multi.tests import *
es_cfg = []
cloud_cfg = []
+ ps_cfg = []
for s in cfg.sections():
if s.startswith('elasticsearch'):
es_cfg.append(ESZoneConfig(cfg, s))
elif s.startswith('cloud'):
cloud_cfg.append(CloudZoneConfig(cfg, s))
+ elif s.startswith('pubsub'):
+ ps_cfg.append(PSZoneConfig(cfg, s))
+
argv = []
num_es_zones = len(es_cfg)
num_cloud_zones = len(cloud_cfg)
+ num_ps_zones_from_conf = len(ps_cfg)
+
+ num_ps_zones = args.num_ps_zones if num_ps_zones_from_conf == 0 else num_ps_zones_from_conf
+ print 'num_ps_zones = ' + str(num_ps_zones)
- num_zones = args.num_zones + num_es_zones + num_cloud_zones + args.num_ps_zones
+ num_zones = args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones
for zg in range(0, args.num_zonegroups):
zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
ccfg.target_path, zonegroup, cluster)
elif ps_zone:
zone_index = z - args.num_zones - num_es_zones - num_cloud_zones
- zone = PSZone(zone_name(zg, z), zonegroup, cluster)
+ if num_ps_zones_from_conf == 0:
+ zone = PSZone(zone_name(zg, z), "false", "7", zonegroup, cluster)
+ else:
+ pscfg = ps_cfg[zone_index]
+ zone = PSZone(zone_name(zg, z), pscfg.full_sync, pscfg.retention_days, zonegroup, cluster)
else:
zone = RadosZone(zone_name(zg, z), zonegroup, cluster)