f' orch {self._orch!r}'
)
- def apply(self, inputs: Iterable[SMBResource]) -> ResultGroup:
+ def apply(
+ self, inputs: Iterable[SMBResource], *, create_only: bool = False
+ ) -> ResultGroup:
+ """Apply resource configuration changes.
+ Set `create_only` to disable changing existing resource values.
+ """
log.debug('applying changes to internal data store')
results = ResultGroup()
staging = _Staging(self.internal_store)
for resource in incoming:
staging.stage(resource)
for resource in incoming:
- results.append(self._check(resource, staging))
+ results.append(
+ self._check(resource, staging, create_only=create_only)
+ )
except ErrorResult as err:
results.append(err)
except Exception as err:
log.debug("search found %d resources", len(out))
return out
- def _check(self, resource: SMBResource, staging: _Staging) -> Result:
+ def _check(
+ self,
+ resource: SMBResource,
+ staging: _Staging,
+ *,
+ create_only: bool = False,
+ ) -> Result:
"""Check/validate a staged resource."""
log.debug('staging resource: %r', resource)
+ if create_only:
+ if not staging.is_new(resource):
+ return Result(
+ resource,
+ success=False,
+ msg='a resource with the same ID already exists',
+ )
try:
if isinstance(
resource, (resources.Cluster, resources.RemovedCluster)