"""AioKem class for interacting with Kohler Energy Management System (KEM) API."""
from __future__ import annotations
import asyncio
import contextlib
import logging
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime, timedelta, tzinfo
from http import HTTPStatus
from typing import Any
import jwt
from aiohttp import (
ClientConnectionError,
ClientConnectorError,
ClientSession,
ClientTimeout,
ContentTypeError,
hdrs,
)
from multidict import CIMultiDict, istr
from yarl import URL
from aiokem.helpers import convert_number_abs, convert_timestamp, reverse_mac_address
from .exceptions import (
AuthenticationCredentialsError,
AuthenticationError,
CommunicationError,
ServerError,
)
from .message_logger import log_json_message
_LOGGER = logging.getLogger(__name__)
AUTHENTICATION_URL = URL("https://kohler-homeenergy.okta.com/oauth2/default/v1/token")
CLIENT_KEY = (
"MG9hMXFpY3BkYWdLaXdFekYxZDg6d3Raa1FwNlY1T09vMW9"
"PcjhlSFJHTnFBWEY3azZJaXhtWGhINHZjcnU2TWwxSnRLUE5obXdsMEN1MGlnQkVIRg=="
)
API_KEY = "pgH7QzFHJx4w46fI~5Uzi4RvtTwlEXp"
API_KEY_HDR = istr("apikey")
API_BASE = "https://api.hems.rehlko.com"
API_BASE_URL = URL(API_BASE)
ME_URL = URL(f"{API_BASE}/kem/api/v3/homeowner/me")
NOTIFICATIONS_URL = URL(f"{API_BASE}/kem/api/v3/notifications")
HOMES_URL = URL(f"{API_BASE}/kem/api/v3/homeowner/homes")
AUTH_HEADERS = CIMultiDict(
{
hdrs.ACCEPT: "application/json",
hdrs.AUTHORIZATION: f"Basic {CLIENT_KEY}",
hdrs.CONTENT_TYPE: "application/x-www-form-urlencoded",
}
)
DEFAULT_CLIENT_TIMEOUT = ClientTimeout(total=20)
RETRY_EXCEPTIONS = (
CommunicationError,
ServerError,
ClientConnectorError,
)
AUTHORIZATION_EXCEPTIONS = (AuthenticationError,)
[docs]
class AioKem:
"""AioKem class for interacting with Kohler Energy Management System (KEM) API."""
def __init__(self, session: ClientSession, home_timezone: tzinfo = UTC) -> None:
"""
Initialize the AioKem class.
Args:
session (ClientSession): An aiohttp ClientSession object.
home_timezone (tzinfo): The timezone used to convert local timestamps.
"""
self._token: str | None = None
self._refresh_token: str | None = None
self._session = session
self._token_expires_at: float = 0
self._token_expires_in: int = 0
self._retry_count: int = 0
self._retry_delays: list[int] = []
self._refresh_lock = asyncio.Lock()
self.refresh_token_callable: Callable[[str | None], Awaitable[None]] | None = (
None
)
self._timeout = DEFAULT_CLIENT_TIMEOUT
self._home_timezone = home_timezone
[docs]
def set_timeout(self, timeout: int) -> None:
"""
Set the timeout for the session.
Args:
timeout (int): Timeout in seconds.
"""
self._timeout = ClientTimeout(total=timeout)
_LOGGER.debug("Timeout set to %s seconds", timeout)
[docs]
def set_retry_policy(self, retry_count: int, retry_delays: list[int]) -> None:
"""
Set the retry policy for the session.
Args:
retry_count (int): Number of retries. Zero means no retries.
retry_delays (list[int]): Delay between retries in seconds for each retry.
"""
self._retry_count = retry_count
self._retry_delays = retry_delays
[docs]
def set_refresh_token_callback(
self, callback: Callable[[str | None], Awaitable[None]]
) -> None:
"""
Set the callback for refresh token updates.
Args:
callback (callable): Callback function to be called when the refresh
token updates.
The function should accept a single argument, which is the new
refresh token.
"""
self.refresh_token_callable = callback
[docs]
async def on_refresh_token_update(self, refresh_token: str | None) -> None:
"""Execute the registered callback."""
if self.refresh_token_callable:
try:
_LOGGER.debug("Calling refresh token callback")
await self.refresh_token_callable(refresh_token)
except Exception as e:
_LOGGER.error("Error in refresh token callback: %s", e)
async def _authentication_helper(self, data: dict[str, Any]) -> None:
"""Helper function for authentication."""
_LOGGER.debug("Sending authentication request to %s", AUTHENTICATION_URL)
try:
response = await self._session.post(
AUTHENTICATION_URL,
headers=AUTH_HEADERS,
data=data,
timeout=self._timeout,
)
response_data = await response.json()
except ClientConnectionError as e:
raise CommunicationError(f"Connection error: {e}") from e
except TimeoutError as e:
raise CommunicationError(f"Timeout error: {e}") from e
if _LOGGER.isEnabledFor(logging.DEBUG):
log_json_message(response_data)
if response.status != HTTPStatus.OK:
if response.status == HTTPStatus.BAD_REQUEST:
raise AuthenticationCredentialsError(
f"Invalid Credentials: "
f"{response_data.get('error_description', 'unknown')} "
f"Code {response.status}"
)
else:
raise AuthenticationError(
f"Authentication failed: "
f"{response_data.get('error_description', 'unknown')} "
f"Code {response.status}"
)
self._token = response_data.get("access_token")
if not self._token:
raise ServerError("Login failed: No access token received")
self._refresh_token = response_data.get("refresh_token")
if not self._refresh_token:
raise ServerError("Login failed: No refresh token received")
self._token_expires_in = response_data.get("expires_in")
self._token_expires_at = time.monotonic() + self._token_expires_in
_LOGGER.debug(
"Authentication successful. Token expires at %s",
datetime.now() + timedelta(seconds=self._token_expires_in),
)
[docs]
async def authenticate(
self, email: str, password: str, refresh_token: str | None = None
) -> None:
"""Login to the server."""
_LOGGER.debug("Authenticating user %s", email)
self.email = email
self.password = password
if refresh_token:
with contextlib.suppress(AuthenticationError):
await self.authenticate_with_refresh_token(refresh_token)
return
await self._authentication_helper(
{
"grant_type": "password",
"username": email,
"password": password,
"scope": "openid profile offline_access email",
}
)
await self.on_refresh_token_update(self._refresh_token)
[docs]
async def authenticate_with_refresh_token(self, refresh_token: str) -> None:
"""Login to the server using a refresh token."""
_LOGGER.debug("Authenticating with refresh token.")
await self._authentication_helper(
{
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid profile offline_access email",
}
)
await self.on_refresh_token_update(self._refresh_token)
[docs]
async def check_and_refresh_token(self) -> None:
"""Check if the token is expired and refresh it if necessary."""
_LOGGER.debug("Checking if token needs to be refreshed.")
if not self._token:
raise AuthenticationError("Not authenticated")
if time.monotonic() >= self._token_expires_at:
# Prevent reentry and refreshing token multiple times
async with self._refresh_lock:
if time.monotonic() >= self._token_expires_at:
_LOGGER.debug("Access token expired. Refreshing token.")
await self._authentication_helper(
{
"grant_type": "refresh_token",
"refresh_token": self._refresh_token,
"scope": "openid profile offline_access email",
}
)
# Execute callback outside of lock to avoid deadlock
await self.on_refresh_token_update(self._refresh_token)
async def _get_helper(self, url: URL) -> dict[str, Any] | list[dict[str, Any]]:
"""Helper function to get data from the API."""
headers = CIMultiDict(
{
API_KEY_HDR: API_KEY,
hdrs.AUTHORIZATION: f"bearer {self._token}",
}
)
_LOGGER.debug("Sending GET request to %s", url)
try:
response = await self._session.get(
url, headers=headers, timeout=self._timeout
)
except ClientConnectionError as e:
raise CommunicationError(f"Connection error: {e}") from e
except TimeoutError as e:
raise CommunicationError(f"Timeout error: {e}") from e
if response.status == HTTPStatus.OK:
try:
response_data = await response.json()
if _LOGGER.isEnabledFor(logging.DEBUG):
log_json_message(response_data)
_LOGGER.debug("Data successfully fetched from %s", url)
return response_data
except ContentTypeError as e:
raise CommunicationError(
f"Failed to parse response: {e} "
f"Content-Type: {response.headers.get(hdrs.CONTENT_TYPE)}"
f"Text: {await response.text()}"
) from e
response_data = await response.text()
if response.status == HTTPStatus.UNAUTHORIZED:
raise AuthenticationError(f"Unauthorized: {response_data}")
else:
raise ServerError(f"Status: {response.status} Response: {response_data}")
async def _retry_auth(self) -> bool:
"""Retry authentication."""
_LOGGER.debug("Retrying authentication")
try:
await self.authenticate(email=self.email, password=self.password)
except AuthenticationError as error:
_LOGGER.error("Authentication failed: %s", error)
return False
return True
async def _retry_get_helper(
self, url: URL
) -> dict[str, Any] | list[dict[str, Any]]:
"""Retry GET request with exponential backoff."""
await self.check_and_refresh_token()
last_error = None
for attempt in range(self._retry_count + 1):
if attempt > 0:
await asyncio.sleep(self._retry_delays[attempt - 1])
try:
return await self._get_helper(url)
except RETRY_EXCEPTIONS as error:
last_error = error
_LOGGER.debug("Retryable exception: %s", error)
except AUTHORIZATION_EXCEPTIONS as error:
_LOGGER.debug("Authorization error communicating with KEM: %s", error)
last_error = error
if not await self._retry_auth():
raise AuthenticationError("Retry authentication failed") from error
_LOGGER.error(
"Failed to get data after %s retries, error %s", attempt, last_error
)
raise CommunicationError(
f"Failed to get data after {attempt} retries, error {last_error}"
) from last_error
[docs]
async def get_homeowner(self) -> dict[str, Any]:
"""Get homeowner information."""
_LOGGER.debug("Fetching homeowner information.")
response = await self._retry_get_helper(ME_URL)
if not isinstance(response, dict):
raise TypeError(
f"Expected an object, but got a different type {type(response)}"
)
return response
[docs]
async def get_notifications(self) -> list[dict[str, Any]]:
"""Get list of notifications."""
_LOGGER.debug("Fetching notifications.")
response = await self._retry_get_helper(NOTIFICATIONS_URL)
if not isinstance(response, list):
raise TypeError(
"Expected a list of notifications, but got a different type "
f"{type(response)}"
)
return response
[docs]
async def get_homes(self) -> list[dict[str, Any]]:
"""Get the list of homes."""
_LOGGER.debug("Fetching list of homes.")
response = await self._retry_get_helper(HOMES_URL)
if not isinstance(response, list):
raise TypeError(
f"Expected a list of homes, but got a different type {type(response)}"
)
for homes in response:
for devices in homes.get("devices", []):
# The mac address is reversed in the response
if mac_address := devices.get("macAddress"):
devices["macAddress"] = reverse_mac_address(mac_address)
return response
[docs]
async def get_generator_data(self, generator_id: int) -> dict[str, Any]:
"""Get generator data for a specific generator."""
_LOGGER.debug("Fetching generator data for generator ID %d", generator_id)
url = API_BASE_URL.with_path(f"/kem/api/v3/devices/{generator_id}")
response = await self._retry_get_helper(url)
if not isinstance(response, dict):
raise TypeError(
"Expected a dictionary for generator data, "
f"but got a different type {type(response)}"
)
# The mac address is reversed in the response
if mac_address := response.get("device", {}).get("macAddress"):
response["device"]["macAddress"] = reverse_mac_address(mac_address)
# These timestamps are local time without timezone info
convert_timestamp(
response.get("exercise", {}), "nextStartTimestamp", self._home_timezone
)
for k in ("lastMaintenanceTimestamp", "nextMaintenanceTimestamp"):
convert_timestamp(response.get("device", {}), k, self._home_timezone)
for measurement in ("generatorLoadW", "generatorLoadPercent"):
convert_number_abs(response, measurement)
return response
[docs]
async def get_alerts(self, generator_id: int) -> list[dict[str, Any]]:
"""Get list of alerts for a generator."""
_LOGGER.debug("Fetching alerts for generator ID %d", generator_id)
url = API_BASE_URL.with_path(f"/kem/api/v3/devices/{generator_id}/alerts")
response = await self._retry_get_helper(url)
if not isinstance(response, list):
raise TypeError(
f"Expected a list of alerts, but got a different type {type(response)}"
)
return response
[docs]
async def get_events(self, generator_id: int) -> list[dict[str, Any]]:
"""Get list of events for a generator."""
_LOGGER.debug("Fetching events for generator ID %d", generator_id)
url = API_BASE_URL.with_path(f"/kem/api/v3/devices/{generator_id}/events")
response = await self._retry_get_helper(url)
if not isinstance(response, list):
raise TypeError(
f"Expected a list of events, but got a different type {type(response)}"
)
return response
[docs]
async def get_maintenance_notes(self, generator_id: int) -> list[dict[str, Any]]:
"""Get list of maintenance_notes for a generator."""
_LOGGER.debug("Fetching maintenance notes for generator ID %d", generator_id)
url = API_BASE_URL.with_path(
f"/kem/api/v3/devices/{generator_id}/maintenance_notes"
)
response = await self._retry_get_helper(url)
if not isinstance(response, list):
raise TypeError(
"Expected a list of maintenance notes, but got a different type "
f"{type(response)}"
)
return response
[docs]
async def close(self) -> None:
"""Close the session."""
_LOGGER.debug("Closing AioKem.")
self.refresh_token_callable = None
self._session = None
self._token = None
self._refresh_token = None
[docs]
def get_token_subject(self) -> str | None:
"""Returns the subject of the JWT token, used as unique id for the user."""
if not self._token:
raise AuthenticationError("Not authenticated")
# Decode the JWT token and extract the subject
try:
token_data = jwt.decode(self._token, options={"verify_signature": False})
except jwt.DecodeError as e:
_LOGGER.error("Failed to decode JWT token: %s", e)
return None
return token_data.get("sub", None)