for filename in config:
if filename in self.agent.required_files:
file_path = os.path.join(self.agent.daemon_dir, filename)
- with open(os.open(file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(file_path, perms=0o600) as f:
f.write(config[filename])
- os.rename(file_path + '.new', file_path)
self.agent.pull_conf_settings()
self.agent.wakeup()
for filename in config:
if filename in self.required_files:
file_path = os.path.join(self.daemon_dir, filename)
- with open(os.open(file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(file_path, perms=0o600) as f:
f.write(config[filename])
- os.rename(file_path + '.new', file_path)
unit_run_path = os.path.join(self.daemon_dir, 'unit.run')
- with open(os.open(unit_run_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(unit_run_path, perms=0o600) as f:
f.write(self.unit_run())
- os.rename(unit_run_path + '.new', unit_run_path)
meta: Dict[str, Any] = {}
meta_file_path = os.path.join(self.daemon_dir, 'unit.meta')
if 'meta_json' in self.ctx and self.ctx.meta_json:
meta = json.loads(self.ctx.meta_json) or {}
- with open(os.open(meta_file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(meta_file_path, perms=0o600) as f:
f.write(json.dumps(meta, indent=4) + '\n')
- os.rename(meta_file_path + '.new', meta_file_path)
unit_file_path = os.path.join(self.ctx.unit_dir, self.unit_name())
- with open(os.open(unit_file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(unit_file_path, perms=0o600) as f:
f.write(self.unit_file())
- os.rename(unit_file_path + '.new', unit_file_path)
call_throws(self.ctx, ['systemctl', 'daemon-reload'])
call(self.ctx, ['systemctl', 'stop', self.unit_name()],