def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
return self.upgrade.upgrade_status()
+ @handle_orch_error
+ def upgrade_ls(self, image: Optional[str], tags: bool) -> List[str]:
+ return self.upgrade.upgrade_ls(image, tags)
+
@handle_orch_error
def upgrade_start(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
--- /dev/null
+import requests
+from typing import List, Dict, Tuple
+from requests import Response
+
+
+class Registry:
+
+ def __init__(self, url: str):
+ self._url: str = url
+
+ @property
+ def api_domain(self) -> str:
+ if self._url == 'docker.io':
+ return 'registry-1.docker.io'
+ return self._url
+
+ def get_token(self, response: Response) -> str:
+ realm, params = self.parse_www_authenticate(response.headers['Www-Authenticate'])
+ r = requests.get(realm, params=params)
+ r.raise_for_status()
+ ret = r.json()
+ if 'access_token' in ret:
+ return ret['access_token']
+ if 'token' in ret:
+ return ret['token']
+ raise ValueError(f'Unknown token reply {ret}')
+
+ def parse_www_authenticate(self, text: str) -> Tuple[str, Dict[str, str]]:
+ # 'Www-Authenticate': 'Bearer realm="https://auth.docker.io/token",service="registry.docker.io",scope="repository:ceph/ceph:pull"'
+ r: Dict[str, str] = {}
+ for token in text.split(','):
+ key, value = token.split('=', 1)
+ r[key] = value.strip('"')
+ realm = r.pop('Bearer realm')
+ return realm, r
+
+ def get_tags(self, image: str) -> List[str]:
+ tags = []
+ headers = {'Accept': 'application/json'}
+ url = f'https://{self.api_domain}/v2/{image}/tags/list'
+ while True:
+ r = requests.get(url, headers=headers)
+ if r.status_code == 401:
+ if 'Authorization' in headers:
+ raise ValueError('failed authentication')
+ token = self.get_token(r)
+ headers['Authorization'] = f'Bearer {token}'
+ continue
+ r.raise_for_status()
+
+ new_tags = r.json()['tags']
+ tags.extend(new_tags)
+
+ if 'Link' not in r.headers:
+ break
+
+ # strip < > brackets off and prepend the domain
+ url = f'https://{self.api_domain}' + r.headers['Link'].split(';')[0][1:-1]
+ continue
+
+ return tags
from typing import TYPE_CHECKING, Optional, Dict, List, Tuple
import orchestrator
+from cephadm.registry import Registry
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
return None
+ def upgrade_ls(self, image: Optional[str], tags: bool) -> List[str]:
+ if image:
+ reg_name, image = image.split('/', 1)
+ else:
+ reg_name, image = self.mgr.container_image_base.split('/', 1)
+ self.mgr.log.info(f'reg_name {reg_name} image {image}')
+ reg = Registry(reg_name)
+ versions = []
+ ls = reg.get_tags(image)
+ if not tags:
+ for t in ls:
+ if t[0] != 'v':
+ continue
+ v = t[1:].split('.')
+ if len(v) != 3:
+ continue
+ if '-' in v[2]:
+ continue
+ versions.append('.'.join(v))
+ ls = sorted(
+ versions,
+ key=lambda k: list(map(int, k.split('.'))),
+ reverse=True
+ )
+ else:
+ ls = sorted(ls)
+ return ls
+
def upgrade_start(self, image: str, version: str) -> str:
if self.mgr.mode != 'root':
raise OrchestratorError('upgrade is not supported in %s mode' % (
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
+ def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[List[str]]:
+ raise NotImplementedError()
+
def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
+ @_cli_read_command('orch upgrade ls')
+ def _upgrade_ls(self,
+ image: Optional[str] = None,
+ tags: bool = False) -> HandleCommandResult:
+ """Check for available versions (or tags) we can upgrade to"""
+ completion = self.upgrade_ls(image, tags)
+ r = raise_if_exception(completion)
+ out = json.dumps(r, indent=4)
+ return HandleCommandResult(stdout=out)
+
@_cli_write_command('orch upgrade status')
def _upgrade_status(self) -> HandleCommandResult:
"""Check service versions vs available and target containers"""