@property
def port(self) -> int:
- if not self.ctx.tcp_ports:
+ ports = fetch_tcp_ports(self.ctx)
+ if not ports:
return self.DEFAULT_PORT
- else:
- if len(self.ctx.tcp_ports) > 0:
- return int(self.ctx.tcp_ports.split()[0])
- else:
- return self.DEFAULT_PORT
+ return ports[0]
def get_daemon_args(self) -> List[str]:
v3_args = []
migrate_sysctl_dir(ctx, ctx.fsid)
# Get and check ports explicitly required to be opened
- daemon_ports = [] # type: List[int]
-
- if ctx.tcp_ports:
- daemon_ports = list(map(int, ctx.tcp_ports.split()))
+ daemon_ports = fetch_tcp_ports(ctx)
_common_deploy(ctx, daemon_type, daemon_id, daemon_ports, deployment_type)
migrate_sysctl_dir(ctx, ctx.fsid)
# Get and check ports explicitly required to be opened
- daemon_ports = [] # type: List[int]
-
- if ctx.tcp_ports:
- daemon_ports = list(map(int, ctx.tcp_ports.split()))
+ daemon_ports = fetch_tcp_ports(ctx)
_common_deploy(ctx, daemon_type, daemon_id, daemon_ports, deployment_type)
else:
call_throws(ctx, ['rm', '-rf', data_dir])
- if 'tcp_ports' in ctx and ctx.tcp_ports is not None:
- ports: List[int] = [int(p) for p in ctx.tcp_ports.split()]
+ ports: List[int] = fetch_tcp_ports(ctx)
+ if ports:
try:
fw = Firewalld(ctx)
fw.close_ports(ports)