cast,
)
+import contextlib
import logging
import operator
import time
incoming = order_resources(inputs)
for resource in incoming:
staging.stage(resource)
- for resource in incoming:
- results.append(
- self._check(resource, staging, create_only=create_only)
- )
+ with _store_transaction(staging.destination_store):
+ for resource in incoming:
+ results.append(
+ self._check(
+ resource, staging, create_only=create_only
+ )
+ )
except ErrorResult as err:
results.append(err)
except Exception as err:
'successfully updated %s resources. syncing changes to public stores',
len(list(results)),
)
- results = staging.save()
- _prune_linked_entries(staging)
- self._sync_modified(results)
+ with _store_transaction(staging.destination_store):
+ results = staging.save()
+ _prune_linked_entries(staging)
+ with _store_transaction(staging.destination_store):
+ self._sync_modified(results)
return results
def cluster_ids(self) -> List[str]:
use for data access.
"""
return f'client.smb.fs.cluster.{cluster_id}'
+
+
+@contextlib.contextmanager
+def _store_transaction(store: ConfigStore) -> Iterator[None]:
+ transaction = getattr(store, 'transaction', None)
+ if not transaction:
+ log.debug("No transaction support for store")
+ yield None
+ return
+ log.debug("Using store transaction")
+ with transaction():
+ yield None