Source code for configalchemy.contrib.apollo

import json
import logging
import threading
import time
from http import HTTPStatus
from typing import Dict

import requests

from configalchemy import BaseConfig, ConfigType

time_counter = time.time


class ConfigException(Exception):
    ...


logger = logging.getLogger(__name__)


[docs]class ApolloBaseConfig(BaseConfig): CONFIGALCHEMY_ENABLE_FUNCTION = True APOLLO_USING_CACHE = False APOLLO_SERVER_URL = "" APOLLO_APP_ID = "" APOLLO_CLUSTER = "default" APOLLO_NAMESPACE = "application" APOLLO_EXTRA_NAMESPACE = "" APOLLO_EXTRA_NAMESPACE_PRIORITY = 9 APOLLO_LONG_POLL_TIMEOUT = 80 def __init__(self): self.apollo_notification_map: Dict[str, ConfigType] = {} super().__init__() def start_long_poll(self): logger.info("start long poll") thread = threading.Thread(target=self.long_poll) thread.daemon = True thread.start() return thread def _access_config_by_namespace(self, namespace: str) -> ConfigType: route = "configs" if self.APOLLO_USING_CACHE: route = "configfiles" url = ( f"{self.APOLLO_SERVER_URL}/{route}/{self.APOLLO_APP_ID}/" f"{self.APOLLO_CLUSTER}/{namespace}" ) logger.info(f"Access apollo server url: {url}") response = requests.get(url) if response.ok: data = response.json() self.apollo_notification_map.setdefault(data["namespaceName"], {"id": -1}) self.apollo_notification_map[data["namespaceName"]]["data"] = data.get( "configurations", {} ) logger.debug(f"Got from apollo: {data}") return data.get("configurations", {}) else: raise ConfigException(f"loading config failed: {url}") def configuration_function(self) -> ConfigType: self.from_mapping( self._access_config_by_namespace(self.APOLLO_NAMESPACE), priority=self.CONFIGALCHEMY_FUNCTION_VALUE_PRIORITY, ) for namespace in self.APOLLO_EXTRA_NAMESPACE.split(","): if namespace: self.from_mapping( self._access_config_by_namespace(namespace), priority=self.APOLLO_EXTRA_NAMESPACE_PRIORITY, ) return {} def long_poll_from_apollo(self): url = f"{self.APOLLO_SERVER_URL}/notifications/v2/" notifications = [] for key, value in self.apollo_notification_map.items(): notifications.append({"namespaceName": key, "notificationId": value["id"]}) r = requests.get( url=url, params={ "appId": self.APOLLO_APP_ID, "cluster": self.APOLLO_CLUSTER, "notifications": json.dumps(notifications, ensure_ascii=False), }, timeout=self.APOLLO_LONG_POLL_TIMEOUT, ) if r.status_code == HTTPStatus.NOT_MODIFIED: logger.info("Apollo No change, loop...") elif r.status_code == HTTPStatus.OK: data = r.json() for entry in data: logger.info( "%s has changes: notificationId=%d" % (entry["namespaceName"], entry["notificationId"]) ) namespace = entry["namespaceName"] if namespace == self.APOLLO_NAMESPACE: self.from_mapping( self._access_config_by_namespace(namespace), priority=self.CONFIGALCHEMY_FUNCTION_VALUE_PRIORITY, ) else: self.from_mapping( self._access_config_by_namespace(namespace), priority=self.APOLLO_EXTRA_NAMESPACE_PRIORITY, ) self.apollo_notification_map[entry["namespaceName"]]["id"] = entry[ "notificationId" ] else: # pragma: no cover raise ConfigException(f"{url} : unexpected status {r.status_code}") def long_poll(self): while True: try: logger.debug("start apollo configuration long poll") self.long_poll_from_apollo() except ConfigException: time.sleep(5)