SKA SDP Data Queue Classes

A collection of classes that provides access to Kafka.

enum ska_sdp_dataqueues.data_queue.Encoding(value)

The encode/decode method to use

Member Type:

str

Valid values are as follows:

UTF8 = <Encoding.UTF8: 'utf-8'>
ASCII = <Encoding.ASCII: 'ascii'>
MSGPACK_NUMPY = <Encoding.MSGPACK_NUMPY: 'msgpack_numpy'>
NPY = <Encoding.NPY: 'npy'>
XARRAY = <Encoding.XARRAY: 'xarray'>
JSON = <Encoding.JSON: 'json'>
BYTES = <Encoding.BYTES: 'bytes'>
class ska_sdp_dataqueues.data_queue.DataQueueProducer(server: str | list[str], message_max_bytes: int = 1048576, topic: str | None = None, encoding: Encoding | None = None)

A Producer object makes a connection to a Kafka server and allows messages to be pushed to specified topics, and optionally specific partitions.

There is an option to set the maximum message size, which defaults to 1 MiB (2**20 bytes) as recommended by Kafka. Sending larger messages than this will linearly increase latency.

Parameters:
  • server – The address(es) for the Kafka Broker to query for metadata and set up the connection.

  • topic – The default Kafka topic to write messages to

  • message_max_bytes – maximum message size for push

  • encoding – The default encoding to use (or None to specify for each message)

async astart()

Start this producer

async astop()

Stop this producer

async send(data: tuple[str, bytes] | ndarray | dict[str, object] | int | float | bool | str, encoding: Encoding | None = None, schema: AnyDataclass | None = None, validate: bool = False, topic: str | None = None, partition: int | None = None) RecordMetadata

Sends data to the Kafka topic

Parameters:
  • data – Data to send to Kafka topic

  • encoding – Encoding type

  • schema – Schema to validate data (either xradio DatasetSchema or a dataclass)

  • validate – Validate using schema?

  • topic – The topic to send data to. Optional. If not provided the default topic (from the constructor will be used).

  • partition – The explicit partition to send data to. Optional. If not provided partitions will be set automatically.

Returns:

AIOKafka RecordMetadata

class ska_sdp_dataqueues.data_queue.DataQueueConsumer(server: str | list[str], topics: list[str] | None = None, encoding: Encoding = Encoding.BYTES, set_group_id: bool = True, auto_offset_reset='earliest', set_consumer_offsets: bool = True)

A Consumer object makes a connection to a Kafka server and allows messages to be streamed from specified topics, and optionally specific partitions.

Parameters:
  • server – The address(es) for the Kafka Broker to query for metadata and set up the connection.

  • topics – The Kafka topics to stream messages from.

  • encoding – The encoding to use, use None to get back bytes.

  • set_group_id – Whether to set a group ID or not (based on server and topics)

  • auto_offset_reset – From where the consumer should start (earliest | latest)

  • set_consumer_offsets – Whether to set the consumer offsets.

async astart()

Start this consumer and adjust its offsets

async astop()

Stop this consumer

update_topics(topics: list[str]) None

Update the list of topics that can be consumed.

Note this will replace all topics subscribed to.

assign_topics(topic_partitions: list[tuple[str, int]]) None

Update the list of topics (and partitions) that can be consumed.

Note this will replace all topics subscribed to.

Example call: admin.assign_topics([("topic-name", 0), ("topic-name", 3)])

Would then consume messages from topic-name using only partitions 0 and 3

class ska_sdp_dataqueues.data_queue.DataQueueAdmin(server: str | list[str])

An Admin object allows for priveledge access to the cluster, this can be used to create new topics/partitions, and to list the current topics/partitions.

Parameters:

server – The address(es) for the Kafka Broker to query for metadata and set up the connection.

async list_topics() list[dict]

List the currently created topics

async create_topics(topics: list[str] | list[tuple[str, int]], topic_configuration: dict | None = None)

Create a list of topics, or create topics with partition counts.

topics should either be:

  • topics=["topic1", "topic2"]

  • topics=[("topic1", 1), ("topic2", 10)]

The two types can be mixed, and in most cases the first should be used.

class ska_sdp_dataqueues.data_queue.DeprecatedAwaitable(value, message: str)

Stub Awaitable that raises DeprecationWarning when awaited.