"""DSN (Data Service Name) type definitions."""
from __future__ import annotations
from typing import Any
from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler
from pydantic.json_schema import JsonSchemaValue
from pydantic_core import Url, core_schema
[docs]
class KafkaUrl(Url):
"""Kafka client specific URL for connecting to bootstrap servers.
Supports initialization using either a URL or bootstrap address.
>>> from ska_sdp_config.entity.common import KafkaUrl
>>> from pydantic import TypeAdapter
>>> assert TypeAdapter(KafkaUrl).validate_python(
... "kafka://localhost:9092"
... ).bootstrap_address == "localhost:9092"
>>> assert TypeAdapter(KafkaUrl).validate_python(
... "localhost:9092"
... ).bootstrap_address == "localhost:9092"
"""
def __new__(cls, uri: str):
return super().__new__(cls, uri)
@property
def bootstrap_address(self):
"""Bootstrap address format used by python Consumer and Producers."""
return f"{self.host}:{self.port}"
@classmethod
def __validate(cls, dsn: Any) -> KafkaUrl:
if not isinstance(dsn, KafkaUrl):
dsn = KafkaUrl(str(dsn))
assert (
dsn.username is None
), f"username not supported in kafka url: {dsn.username}"
assert (
dsn.password is None
), f"password not supported in kafka url: {dsn.password}"
assert dsn.path is None, f"path not supported in kafka url: {dsn.path}"
assert (
dsn.query is None
), f"query not supported in kafka url: {dsn.query}"
assert (
dsn.fragment is None
), f"fragment not supported in kafka url: {dsn.fragment}"
return dsn
@classmethod
def __url_conversion(cls, uri: str) -> str:
return "kafka://" + uri
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
schema = core_schema.union_schema(
[
core_schema.chain_schema(
[
core_schema.url_schema(
host_required=True,
default_port=9092,
allowed_schemes=["kafka"],
),
core_schema.no_info_plain_validator_function(
cls.__validate
),
]
),
# for backwards compatibility, try prepending a scheme
core_schema.chain_schema(
[
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(
cls.__url_conversion
),
core_schema.url_schema(
host_required=True,
default_port=9092,
allowed_schemes=["kafka"],
),
core_schema.no_info_plain_validator_function(
cls.__validate
),
]
),
],
serialization=core_schema.to_string_ser_schema(),
)
return handler(schema)
@classmethod
def __get_pydantic_json_schema__(
cls,
_core_schema: core_schema.CoreSchema,
_handler: GetJsonSchemaHandler,
) -> JsonSchemaValue:
return {
"format": "uri",
"minLength": 1,
"type": "string",
}