]
+def _specialize(opts: Optional[Dict[str, str]] = None) -> bool:
+ return (opts or {}).get('specialize') != 'no'
+
+
+def _mirror_join_auths(opts: Optional[Dict[str, str]] = None) -> bool:
+ return (opts or {}).get('mirror_join_auths') != 'no'
+
+
+def _mirror_users_and_groups(opts: Optional[Dict[str, str]] = None) -> bool:
+ return (opts or {}).get('mirror_users_and_groups') != 'no'
+
+
def mgr_sqlite3_db(
mgr: Any, opts: Optional[Dict[str, str]] = None
) -> SqliteStore:
"""Set up a store for use in the real ceph mgr."""
- specialize = (opts or {}).get('specialize') != 'no'
+ specialize = _specialize(opts)
return SqliteStore(
mgr,
_tables(specialize=specialize),
)
+def mgr_sqlite3_db_with_mirroring(
+ mgr: Any,
+ mirror_store: ConfigStore,
+ opts: Optional[Dict[str, str]] = None,
+) -> SqliteMirroringStore:
+ """Set up a store for use in the ceph mgr that will mirror some
+ objects into an alternate store.
+ """
+ tables = _tables(specialize=_specialize(opts))
+ mirrors: List[Mirror] = []
+ if _mirror_join_auths(opts):
+ mirrors.append(MirrorJoinAuths(mirror_store))
+ if _mirror_users_and_groups(opts):
+ mirrors.append(MirrorUsersAndGroups(mirror_store))
+ return SqliteMirroringStore(mgr, tables, mirrors)
+
+
def memory_db(
*,
specialize: bool = True,