self.store._validate_tlsobject_name("per_host1")
with self.assertRaises(TLSObjectException):
self.store._validate_tlsobject_name("per_service1")
+
+ def test_register_does_not_double_write_on_second_call(self):
+ """
+ Calling register_object_name() twice for the same name should not create a new KV value
+ nor change the existing one (basic idempotency).
+ """
+ name = "per_service_twice"
+ key = f"{self.store.store_prefix}{name}"
+
+ # First register → writes {}
+ self.store.register_object_name(name, TLSObjectScope.SERVICE)
+ first_val = self.mgr.store.get(key)
+ assert first_val is not None
+ assert json.loads(first_val) == {}
+
+ # Second register → value should remain exactly the same
+ self.store.register_object_name(name, TLSObjectScope.SERVICE)
+ second_val = self.mgr.store.get(key)
+ assert second_val == first_val
+
+ def test_register_writes_tombstone_for_all_scopes(self):
+ """
+ register_object_name() must persist a tombstone for any scope:
+ - SERVICE/HOST → "{}" (empty per-target map)
+ - GLOBAL → minimal JSON for an empty TLS object
+ """
+ # Fresh names (not present in objects_by_name yet)
+ svc_name = "per_service_new"
+ host_name = "per_host_new"
+ glob_name = "global_cert_new"
+
+ svc_key = f"{self.store.store_prefix}{svc_name}"
+ host_key = f"{self.store.store_prefix}{host_name}"
+ glob_key = f"{self.store.store_prefix}{glob_name}"
+
+ # Sanity: no keys yet
+ assert svc_key not in self.mgr.store
+ assert host_key not in self.mgr.store
+ assert glob_key not in self.mgr.store
+
+ # Act
+ self.store.register_object_name(svc_name, TLSObjectScope.SERVICE)
+ self.store.register_object_name(host_name, TLSObjectScope.HOST)
+ self.store.register_object_name(glob_name, TLSObjectScope.GLOBAL)
+
+ # Assert KV tombstones
+ assert svc_key in self.mgr.store
+ assert host_key in self.mgr.store
+ assert glob_key in self.mgr.store
+
+ # SERVICE/HOST → empty dict
+ assert json.loads(self.mgr.store[svc_key]) == {}
+ assert json.loads(self.mgr.store[host_key]) == {}
+
+ # GLOBAL → minimal JSON for empty MockTLSObject
+ expected_global_min = MockTLSObject.to_json(MockTLSObject())
+ assert json.loads(self.mgr.store[glob_key]) == expected_global_min
+
+ # Also assert scope lists updated & in-memory skeletons created
+ assert svc_name in self.store.service_scoped_objects
+ assert host_name in self.store.host_scoped_objects
+ assert glob_name in self.store.global_scoped_objects
+
+ assert isinstance(self.store.objects_by_name[svc_name], dict) and self.store.objects_by_name[svc_name] == {}
+ assert isinstance(self.store.objects_by_name[host_name], dict) and self.store.objects_by_name[host_name] == {}
+ # For globals, register() seeds an entry; it will be assigned the empty object later when loaded/saved.
+ # We only need to ensure the name exists in the registry.
+ assert glob_name in self.store.objects_by_name
+
+ def test_register_is_idempotent_and_does_not_overwrite_existing_kv(self):
+ """
+ If a store key already has content, register_object_name() must not clobber it.
+ """
+ # Pre-seed non-empty SERVICE and HOST entries, and a GLOBAL entry
+ svc_name = "per_service_existing"
+ host_name = "per_host_existing"
+ glob_name = "global_existing"
+
+ svc_key = f"{self.store.store_prefix}{svc_name}."
+ host_key = f"{self.store.store_prefix}{host_name}."
+ glob_key = f"{self.store.store_prefix}{glob_name}."
+
+ pre_svc_val = {"svcA": MockTLSObject.to_json(MockTLSObject("svc-seed", True, False))}
+ pre_host_val = {"hostA": MockTLSObject.to_json(MockTLSObject("host-seed", True, False))}
+ pre_glob_val = MockTLSObject.to_json(MockTLSObject("glob-seed", True, False))
+
+ self.mgr.set_store(svc_key, json.dumps(pre_svc_val))
+ self.mgr.set_store(host_key, json.dumps(pre_host_val))
+ self.mgr.set_store(glob_key, json.dumps(pre_glob_val))
+
+ # Act: register (should be a no-op on KV content)
+ self.store.register_object_name(svc_name, TLSObjectScope.SERVICE)
+ self.store.register_object_name(host_name, TLSObjectScope.HOST)
+ self.store.register_object_name(glob_name, TLSObjectScope.GLOBAL)
+
+ # Assert KV unchanged
+ assert json.loads(self.mgr.store[svc_key]) == pre_svc_val
+ assert json.loads(self.mgr.store[host_key]) == pre_host_val
+ assert json.loads(self.mgr.store[glob_key]) == pre_glob_val
+
+ # And registry entries exist
+ assert svc_name in self.store.objects_by_name
+ assert host_name in self.store.objects_by_name
+ assert glob_name in self.store.objects_by_name
def _set_store(self, obj_name: str, payload: Any) -> None:
self.mgr.set_store(self._kv_key(obj_name), json.dumps(payload))
+ def ensure_tombstone(self, obj_name: str, scope: TLSObjectScope) -> None:
+ """
+ Idempotently ensure a tombstone (empty) KV entry for obj_name so the
+ name is rediscovered after manager restarts (see load method).
+
+ - SERVICE/HOST → `{}` (empty per-target map)
+ - GLOBAL → minimal JSON for an empty TLS object
+ """
+
+ # Do nothing if the TLS object name already exists in the store
+ if self.mgr.get_store_prefix(self._kv_key(obj_name)):
+ return
+
+ if scope in (TLSObjectScope.SERVICE, TLSObjectScope.HOST):
+ self._set_store(obj_name, {})
+ elif scope == TLSObjectScope.GLOBAL:
+ empty = self.tlsobject_class() # falsy empty instance
+ self._set_store(obj_name, self.tlsobject_class.to_json(empty))
+
def register_object_name(self, obj_name: str, scope: TLSObjectScope) -> None:
"""
Register a new TLS object name under the specified scope if it does not already exist.
ValueError: If an invalid scope is provided.
"""
if obj_name not in self.objects_by_name:
- # Initialize an empty dictionary to track TLS objects for this obj_name
- self.objects_by_name[obj_name] = {}
+ # Initialize an empty dictionary/TLSobj to track TLS objects for this obj_name
+ if scope in (TLSObjectScope.SERVICE, TLSObjectScope.HOST):
+ self.objects_by_name[obj_name] = {}
+ elif scope == TLSObjectScope.GLOBAL:
+ self.objects_by_name[obj_name] = self.tlsobject_class()
+ else:
+ raise ValueError(f"Invalid TLSObjectScope '{scope}' for obj_name '{obj_name}'")
+ # Initialize an empty tombstone for the TLS object(s) in the store
+ self.ensure_tombstone(obj_name, scope)
# Add to the appropriate scope list
if scope == TLSObjectScope.SERVICE and obj_name not in self.service_scoped_objects: