logger.error(err_str)
else:
conn.send(b'ACK')
+ if 'config' in data:
+ self.agent.wakeup()
self.agent.ls_gatherer.wakeup()
self.agent.volume_gatherer.wakeup()
logger.debug(f'Got mgr message {data}')
def handle_json_payload(self, data: Dict[Any, Any]) -> None:
self.agent.ack = int(data['counter'])
+ if 'config' in data:
+ logger.info('Received new config from mgr')
+ config = data['config']
+ for filename in config:
+ if filename in self.agent.required_files:
+ with open(os.path.join(self.agent.daemon_dir, filename), 'w') as f:
+ f.write(config[filename])
+ self.agent.pull_conf_settings()
+ self.agent.wakeup()
class CephadmAgent():
loop_interval = 30
stop = False
- required_files = ['agent.json', 'keyring']
+ required_files = [
+ 'agent.json',
+ 'keyring',
+ 'root_cert.pem',
+ 'listener.crt',
+ 'listener.key',
+ ]
def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''):
self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
+ self.starting_port = 14873
self.target_ip = ''
self.target_port = ''
self.host = ''
self.recent_iteration_index: int = 0
self.cached_ls_values: Dict[str, Dict[str, str]] = {}
+ def validate(self, config: Dict[str, str] = {}) -> None:
+ # check for the required files
+ for fname in self.required_files:
+ if fname not in config:
+ raise Error('required file missing from config: %s' % fname)
+
def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
if not config:
raise Error('Agent needs a config')
assert isinstance(config, dict)
+ self.validate(config)
# Create the required config files in the daemons dir, with restricted permissions
for filename in config:
- with open(os.path.join(self.daemon_dir, filename), 'w') as f:
- f.write(config[filename])
+ if filename in self.required_files:
+ with open(os.path.join(self.daemon_dir, filename), 'w') as f:
+ f.write(config[filename])
with open(os.path.join(self.daemon_dir, 'unit.run'), 'w') as f:
f.write(self.unit_run())
def wakeup(self) -> None:
self.event.set()
- def run(self) -> None:
+ def pull_conf_settings(self) -> None:
try:
with open(self.config_path, 'r') as f:
config = json.load(f)
self.target_ip = config['target_ip']
self.target_port = config['target_port']
self.loop_interval = int(config['refresh_period'])
- starting_port = int(config['listener_port'])
+ self.starting_port = int(config['listener_port'])
self.host = config['host']
use_lsm = config['device_enhanced_scan']
except Exception as e:
if use_lsm.lower() == 'true':
self.device_enhanced_scan = True
+ def run(self) -> None:
+ self.pull_conf_settings()
+
try:
for _ in range(1001):
- if not port_in_use(self.ctx, starting_port):
- self.listener_port = str(starting_port)
+ if not port_in_use(self.ctx, self.starting_port):
+ self.listener_port = str(self.starting_port)
break
- starting_port += 1
+ self.starting_port += 1
if not self.listener_port:
- raise Error(f'All 1000 ports starting at {str(starting_port - 1001)} taken.')
+ raise Error(f'All 1000 ports starting at {str(self.starting_port - 1001)} taken.')
except Exception as e:
raise Error(f'Failed to pick port for agent to listen on: {e}')
import threading
import time
-# from orchestrator import OrchestratorError
from mgr_util import verify_tls_files
-from orchestrator import DaemonDescriptionStatus
+from orchestrator import DaemonDescriptionStatus, OrchestratorError
+from orchestrator._interface import daemon_type_to_service
from ceph.utils import datetime_now
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from datetime import datetime, timedelta
-from OpenSSL import crypto
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
-from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING
+from typing import Any, Dict, List, Set, Tuple, \
+ TYPE_CHECKING, Optional
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
def run(self) -> None:
try:
- self.ssl_certs.generate_root_cert()
+ try:
+ old_creds = self.mgr.get_store('cephadm_endpoint_credentials')
+ if not old_creds:
+ raise OrchestratorError('No old credentials for cephadm endpoint found')
+ old_creds_dict = json.loads(old_creds)
+ old_key = old_creds_dict['key']
+ old_cert = old_creds_dict['cert']
+ self.ssl_certs.load_root_credentials(old_cert, old_key)
+ except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
+ self.ssl_certs.generate_root_cert()
+
cert, key = self.ssl_certs.generate_cert()
self.key_tmp = tempfile.NamedTemporaryFile()
self.mgr.log.debug('Starting cherrypy engine...')
self.start_engine()
self.mgr.log.debug('Cherrypy engine started.')
+ cephadm_endpoint_creds = {
+ 'cert': self.ssl_certs.get_root_cert(),
+ 'key': self.ssl_certs.get_root_key()
+ }
+ self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds))
+ self.mgr._kick_serve_loop()
# wait for the shutdown event
self.cherrypy_shutdown_event.wait()
self.cherrypy_shutdown_event.clear()
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
- def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None:
+ def _request_agent_acks(self, hosts: Set[str], increment: bool = False, new_config: Optional[Dict[str, str]] = None) -> None:
for host in hosts:
if increment:
self.mgr.cache.metadata_up_to_date[host] = False
self.mgr.cache.agent_counter[host] = 1
elif increment:
self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
+ payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
+ if new_config:
+ payload['config'] = new_config
message_thread = AgentMessageThread(
- host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr)
+ host, self.mgr.cache.agent_ports[host], payload, self.mgr)
message_thread.start()
def _agent_down(self, host: str) -> bool:
last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
host, agent.name())
if not last_config or last_deps != deps:
+ # if root cert is the dep that changed, we must use ssh to reconfig
+ # so it's necessary to check this one specifically
+ root_cert_match = False
+ try:
+ root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ if last_deps and root_cert in last_deps:
+ root_cert_match = True
+ except Exception:
+ pass
daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- self.mgr._daemon_action(daemon_spec, action='reconfig')
+ # we need to know the agent port to try to reconfig w/ http
+ # otherwise there is no choice but a full ssh reconfig
+ if host in self.mgr.cache.agent_ports and root_cert_match:
+ daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
+ daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
+ self.mgr.cache.agent_counter[daemon_spec.host] = 1
+ self.mgr.agent_helpers._request_agent_acks(
+ hosts={daemon_spec.host},
+ increment=True,
+ new_config=daemon_spec.final_config
+ )
+ self.mgr.cache.update_daemon_config_deps(
+ daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
+ self.mgr.cache.save_host(daemon_spec.host)
+ else:
+ self.mgr._daemon_action(daemon_spec, action='reconfig')
return False
except Exception as e:
self.mgr.log.debug(
self.mgr = mgr
self.root_cert: Any
self.root_key: Any
- self.root_subj: Any
def generate_root_cert(self) -> Tuple[str, str]:
self.root_key = rsa.generate_private_key(
private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
)
- cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+ cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
key_str = self.root_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
)
- cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
+ cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
key_str = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
def get_root_cert(self) -> str:
try:
- return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+ return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
except AttributeError:
return ''
def get_root_key(self) -> str:
try:
- return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
+ return self.root_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption(),
+ ).decode('utf-8')
except AttributeError:
return ''
+
+ def load_root_credentials(self, cert: str, priv_key: str) -> None:
+ given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
+ tz = given_cert.not_valid_after.tzinfo
+ if datetime.now(tz) >= given_cert.not_valid_after:
+ raise OrchestratorError('Given cert is expired')
+ self.root_cert = given_cert
+ self.root_key = serialization.load_pem_private_key(
+ data=priv_key.encode('utf-8'), backend=default_backend(), password=None)