Connector Catalog#

Connector Components in DIANA System#

Metadata Flow Through the Pipeline#

The Diana platform uses comprehensive metadata to track and manage data as it flows through the integration pipeline. Events receive metadata headers at multiple stages of processing, creating a rich context for data lineage, routing decisions, filtering operations, and processing logic. When an event enters the Diana system, it accumulates metadata headers at different stages.

Source Stage: Initial headers are attached when data is ingested from source connectors

Transformation Stage: Additional headers may be added during data transformation processes, and existing metadata may be modified based on transformation logic

Sink Stage: Final metadata is applied before data reaches its destination

This layered approach ensures complete traceability and context preservation throughout the data journey, while enabling sophisticated routing and filtering capabilities.

Data Sources#

Supported Infrastructure

Interface

Capabilities

HTTP clients (Basic Auth/OAuth)

Data Ingest API (Data Push)

Generic ingest, metadata via HTTP header, max 15MB/upload. API documentation
Bosch CIAM tenant, client & secret

Azure Blob Storage Account

Azure Blob Store

Supports flat files, Data Lake Gen2, deletes files after ingest

Active MQ, IBM MQ, Rabbit MQ, Azure Event Hub, Solace

AMQP

AMQP 1.0, multiple queues, event metadata

Apache Pulsar

Pulsar

Multiple topics, event metadata

Azure Event Hub Kafka Mode, AWS MKS Kafka Mode, Native Kafka Broker

Kafka

Kafka headers, event metadata

MQTT

MQTT

MQTT 3.1.1/5.0, multiple topics, QoS, auto reconnect

FTP server

FTP

SFTP file discovery, mtime requirements

GeoTab API

Geotab

Pulls Geotab feeds, keeps state in file system

Details#

Metadata fields present on events entering the system.

Field

Name

Type

Description

dianaingestid

Ingest ID

String

Unique identifier for the ingestion process.

dianaingesttimestamp

Ingest Timestamp

String/OffsetDateTime

Timestamp when the event was ingested into the system. Auto-generated if not present

dianadatasize

Data Size

String/Integer

Size of the event payload in bytes. Calculated from actual data if not present

dianaproducertenant

Producer Tenant

String

Tenant identifier of the event producer. Set from configuration

dianadatastreamid

Data Stream ID

String

Identifier for the data stream. Set from sidecar configuration

dianamd5hash

MD5 Hash

String

MD5 hash of the original message payload calculated before compression or transformation, providing data integrity verification

Note

Default metadata values are only set when fields are missing, preserving existing metadata through multiple processing stages while ensuring all mandatory fields are present for downstream components.

Required Configuration:

  • Broker Url (e.g. amqp://my-amqp-broker.amqp:5672)

  • Topic(s) for data retrieval

  • Password Authentication:

    • Username

    • Password

  • Certificate Authentication:

    • PKCS#12 or PEM certificate file

    • Certificate password

Capabilities:

  • Supports protocol AMQP 1.0

  • Can read from multiple queues of the same broker

  • Supports AMQP headers as event meta data

Metadata Attachments:

Field

Name

Type

Description

queue

Queue

String

Name of the AMQP queue from which message was consumed

Custom Extenstions

Custom JMS Properties

Various

Application-defined message properties (excluding JMSX prefix)

content-type

Content Type

String

application/octet-stream

System properties with JMSX prefix are filtered out to maintain clean metadata. Properties with null values are excluded.

Required Configuration:

  • Bootstrap Server URL (e.g. my-kafka-cluster.kafka:9092)

  • Topic(s) for data retrieval

  • Auth Mechanism (SASL/PLAINTEXT)

  • Profiles: sasl, mtls, oauth, aws, eventhub

  • SASL Authentication:

    • Username

    • Password

  • mTLS Authentication:

    • PKCS#12 or PEM certificate file

    • Certificate password

  • oAuth Authentication (Generic, EventHub, AWS):

    • Client Id

    • Client Secret

    • Specific details for provider

Capabilities:

  • Supports OAuth Authentication (generic, Azure EventHub, AWS MKS)

  • Supports usage of mtls certificates

  • Supports Kafka header as event metadata

Metadata Attachments:

Field

Name

Type

Description

topic

Topic

String

Name of the Kafka topic from which message was consumed

kafkakey

Kafka Key

String

Kafka message key (optional, only when key is present)

Custom Extenstions

Kafka Headers

Various

All Kafka message headers with sanitized values

content-type

Content Type

String

application/octet-stream

Kafka message key is only included when present in the original message. All Kafka headers are included with values sanitized based on the active profile (standard Kafka vs EventHub).

Required Configuration:

  • Broker URL (e.g. pulsar://my-pulsar-broker:6650)

  • Topic(s) for data retrieval

  • Basic Authentication:

    • Username

    • Password

  • OAuth Authentication:

    • Client Id

    • Client Secret

    • Token URL

  • Certificate Authentication:

    • PKCS#12 or PEM certificate file

    • Certificate password

Capabilities:

  • Supports usage of mtls certificates

  • Supports Pulsar header as event metadata

  • Supports durable subscritions with offset management

Metadata Attachments:

Field

Name

Type

Description

topic

Topic

String

Name of the Pulsar topic from which message was consumed

Custom Extenstions

Message Properties

Various

Application-defined message properties attached to the Pulsar message

content-type

Content Type

String

application/octet-stream

All custom message properties from the Pulsar message are included in the CloudEvent metadata. No filtering is applied to properties.

Required Configuration:

  • Container name (e.g. data)

  • Connection Url (e.g. storageaccountfoo.microsoft.com)

  • ADSL2 enabled (whether the source should use the Azure Data Lake Gen2 API)

  • Directory name (e.g. dir/subdir)

  • Authentication type (Azure AD/Service Principal, Shared Key)

  • Shared Key Authentication:

    • Account Name

    • Account Key

  • Azure AD/Service Principal Authentication:

    • Client Id

    • Client Secret

    • Tenant Id

Capabilities:

  • Supports Azure Blob Storage with flat files

  • Supports Azure Data Lake Gen2 with directories and files

  • Scaling: limited to one application instance per source

  • Deletes files in storage account after ingest

Metadata Attachments:

Field

Name

Type

Description

blobname

Blob Name

String

Name/path of the Azure blob that was processed

Custom Extenstions

Blob Metadata

Various

User-defined metadata key-value pairs attached to the blob

content-type

Content Type

String

application/octet-stream

Blob metadata with null values are excluded from CloudEvent metadata. Only user-defined blob metadata properties are included.

Required Configuration:

  • Broker URL (e.g. mqtt://my-mqtt-broker:1883)

  • Topics and QoS

  • Password Authentication:

    • Username

    • Password

  • Certificate Authentication:

    • PKCS#12 client certificate (base64 encoded)

    • Certificate password

    • JKS truststore (base64 encoded)

    • Truststore password

  • Retry configuration

Capabilities:

  • Supports persistent sessions

  • Supports QoS level 0, 1, 2

  • Supports MQTT v3.1.1 and v5.0

  • Scaling: limited to one application instance per source

Metadata Attachments:

Field

Name

Type

Description

topic

Topic

String

Name of the MQTT topic from which message was consumed

Custom Extenstions

MQTT Headers

Various

Application-defined header properties passed to the message processor

content-type

Content Type

String

application/octet-stream

All custom header properties are included in the CloudEvent metadata. The topic name is always included as a special metadata field.

Required Configuration:

  • Server Url (e.g. my-ftp-server.ftp)

  • Server Port (22)

  • Directory from where to read data (e.g. upload)

  • Key Fingerprint representing the FTP server

  • Auth Type (password, publickey, publickey-password)

  • Password Authentication:

    • Username

    • Password

  • Public Key Authentication:

    • Public Key

  • Public Key + Password Authentication:

    • Public Key

    • Password

Capabilities:

  • The discovery of new files requires an agreed handling of files being added to the SFTP directory:

    • new files are discovered based on the file modification time

    • the file mtime is updated when file content is changed, but not when the file is copied to the directory

    • a new file put in the directory must therefore have a more recent file modification than all other files copied before

    • This leads to the following requirement: Suppose there are two files f1 and f2. If file f2 is copied in the directory after f1, then it must be ensured that f1.mtime < f2.mtime. Also, the files may not have the same mtime.

  • On the side of a data provider, special considerations are necessary to comply to this requirement. For example, if a file export job fails, a retry of the export must ensure that the file mtime is more recent than all files which have been copied in the meantime.

Metadata Attachments:

Field

Name

Type

Description

ftpfilename

FTP File Name

String

Name of the FTP file that was processed

ftpfilepath

FTP File Path

String

Full path of the FTP file on the server

content-type

Content Type

String

application/octet-stream

FTP file metadata is extracted from the file system properties. File name and path are always included as metadata fields.

Required Configuration:

  • Result count (e.g. 1000)

  • Initial version to poll (e.g. 0)

  • Scheduling rate:

    • feed (e.g. 60000 in ms)

    • all (e.g. 50000000 in ms)

  • Databases:

    • name (e.g. rb_internal)

    • server (e.g. my.geotab.com)

  • Basic Authentication:

    • Username

    • Password

Capabilities:

  • Supports pulling the following Geotab API feeds: Device, Diagnostic, FaultData, LogRecorder, Status

  • Supports pulling the following endpoints for all data available: FailureMode

  • The source will keep a state (version of pulled feed) in the file system. When deployed, this uses the Azure CSI driver through the custom StorageClass hermes-file-storage

Metadata Attachments:

Field

Name

Type

Description

database

Database

String

Name of the Geotab database being polled

user

User

String

Username used for Geotab API authentication

deviceid

Device ID

String

Device identifier (optional, only when specific device data)

version

Version

String

Current poll version from Geotab API response

fromversion

From Version

String

Starting version for the poll request (default: “static”)

prefix

Prefix

String

Data type class name (e.g., “LogRecord”, “StatusData”)

content-type

Content Type

String

application/json

Device ID is only included when processing device-specific data. All other metadata fields are always present. The prefix field contains the simple class name of the data type being processed.

Required Configuration:

  • Technical documentation: API Documentation

  • Data Provider needs a tenant and client plus secret to ingest data

  • OAuth Authentication:

    • Tenant

    • Client Id

    • Client Secret

Capabilities:

  • Generic data ingest without schemas

  • Meta data can be attached as HTTP header

  • Records the client id of the used oauth technical user client as meta data

  • Max upload limit per request 15MB

Metadata Attachments:

Field

Name

Type

Description

datacontenttype

Content Type

String

Content type from HTTP request header or “application/octet-stream” as default

Custom Extenstions

HTTP Headers

String

All HTTP request headers starting with X-METADATA sanitized and added as cloud event extensions

ingestsubpath

Request URI

String

Complete request URI including path and query parameters

HTTP headers undergo sanitization before being added as extensions. Standard headers like Authorization may be filtered.

Data Sinks#

Supported Infrastructure

Interface

Capabilities

Active MQ, IBM MQ, Rabbit MQ, Azure Event Hub, Solace

AMQP

AMQP 1.0, compression

Amazon S3, Minio

AWS S3

Upload to S3, compression

Azure Blob Store, Azurite Dev Container

Azure Blob Store

Upload to blob store, compression

Any HTTP API/Backend

HTTP

POST requests, one endpoint

IoT Insights data recorder API

IoT-Insights

Upload to IoT Insights Project

Azure Event Hub Kafka Mode, AWS MKS Kafka Mode, Native Kafka Broker

Kafka

Kafka headers, event metadata

MQTT Broker (v3.1.1/v5.0)

MQTT

multiple topics, match expressions

Details#

Metadata fields are present on cloud events leaving processing pipeline to sink.

Metadata Attachments:

Field

Display Name

Type

Description

dianaconsumertenant

Consumer Tenant

String

Tenant identifier of the event consumer.

dianaconsumerpipelineid

Consumer Pipeline ID

String

Identifier for the consumer pipeline.

Required Configuration:

  • Broker Url (e.g. amqp://my-amqp-broker.amqp:5672)

  • Topic to send data to

  • Password Authentication:

    • Username

    • Password

  • Certificate Authentication:

    • Certificate file (PKCS#12 or PEM)

    • Certificate password

  • Compression on/off (can compress data before sending it)

Capabilities:

  • Supports protocol AMQP 1.0

Metadata Attachments:

Metadata Attachments:

Field

Name

Type

Description

uid

UID

String

Randomly generated UUID for message tracking

messageId

Message ID

String

Diana ingestion ID from the CloudEvent extension metadata

The AMQP sink compresses the original CloudEvent data before forwarding and generates a unique tracking ID for each message sent to the broker.

Required Configuration:

  • Endpoint (e.g. s3.eu-central-1.amazonaws.com)

  • region (e.g. eu-central-1)

  • URL pathstyle (defines if path-style for the bucket URL should be used)

  • AWS IAM Authentication:

    • Access Key

    • Access Secret

  • Processor: Compression (gzip/zip/none)

  • Forwarder configuration:

    • match configuration (which cloudevent attribute should be matched)

    • forwarding configuration (bucket, endpoint, filename pattern, duplicate prevention)

Capabilities:

  • Can upload data to one or different S3 and bucket combinations

  • Data blob name can be dynamically built from header fields

  • data can be compressed before uploading it to the S3 bucket

Metadata Attachments: The following metadata is forwarded when uploading files to AWS S3:

Metadata Attachments:

Field

Name

Type

Description

fileName

File Name

String

Generated file name based on the configured pattern, compressed if applicable

content-type

Content Type

String

Data content type from the CloudEvent (e.g., application/json)

contentMD5

Content MD5

String

Base64 encoded MD5 hash of the compressed payload for data integrity verification

S3 object metadata

Custom Extensions

Various

All CloudEvent extension properties are included as S3 object metadata with normalized names

All CloudEvent extension properties are automatically included in the S3 object metadata. Property names are normalized to comply with S3 metadata naming requirements.

Required Configuration:

  • Container name (e.g. data)

  • Connection Url (e.g. storageaccountfoo.microsoft.com)

  • Blob naming pattern

  • Duplicate prevention on/off

  • Shared Key Authentication:

    • Account Name

    • Account Key

  • Azure AD/Service Principal Authentication:

    • Client Id

    • Client Secret

    • Tenant Id

  • Compression on/off

Capabilities:

  • Can upload data to one or different storage account and container combinations

  • Data blob name can be dynamically built from header fields

  • Data can be compressed before uploading it to the blob store

Metadata Attachments:

Field

Name

Type

Description

Content-Type

Content Type

String

Data content type from the CloudEvent (used as blob HTTP header)

Content-MD5

Content MD5

String

MD5 hash of the compressed payload for data integrity verification

blob metadata

Custom Extensions

String

All CloudEvent extension properties are normalized and included as blob metadata with string values

CloudEvent extension names are normalized using StringNormalizer before being added as blob metadata. When the road signature feature is enabled, additional metadata mappings and static values are automatically included. The blob name is generated using configurable patterns and may be modified based on compression settings.

Required Configuration:

  • Target backend url (e.g. http://foo.bar/data)

  • Basic Authentication:

    • Username

    • Password

  • OAuth:

    • IDP Token URL

    • Client Id

    • Client Secret

    • Scope

    • Grant Type (client_credentials)

  • API Key Authentication:

    • API Key

  • C-Technology Update API Authentication:

    • Auth URL

    • Token URL

    • Client Secret

    • Token Prefetch Seconds

  • Geotab DIG Authentication:

    • Additional properties per Geotab DIG specification

Capabilities:

  • Can upload data to exactly one endpoint

  • Performs POST requests

  • The content type is taken over from the originally ingested data

  • The response is only evaluated to detect errors (4XX, 5XX status codes), response itself is discarded

Metadata Attachments:

Field

Name

Type

Description

Content-Type

Content Type

String

Data content type from the CloudEvent or configured content type (defaults to application/octet-stream)

Custom Headers

Custom Extensions

Various

Configured HTTP headers using SpEL expressions that can access CloudEvent properties and extensions

Custom headers are dynamically generated using Spring Expression Language (SpEL) expressions that have access to all CloudEvent properties and extensions. The actual headers sent depend on the sink configuration.

Required Configuration:

  • IoT Insight data recorder project url (e.g. https://bosch-iot-insights.com/data-recorder-service/v2/pqc8349)

  • Basic Authentication:

    • Username

    • Password

Capabilities:

  • Can upload data to exactly one IoT Insights Project

  • Supports basic authentication as provided by IoT Insights

Metadata Attachments:

Field

Name

Type

Description

X-Metadata

X-Metadata

String

Comma-separated list of typed CloudEvent extensions and content type with data type annotations

Content-MD5

Content MD5

String

Base64 encoded MD5 hash of the compressed payload for data integrity verification

All CloudEvent extension properties are automatically included in the X-Metadata header with type annotations. Invalid characters (commas and semicolons) in string values are replaced with spaces for header compliance.

Required Configuration:

  • Bootstrap Server URL (e.g. my-kafka-cluster.kafka:9092)

  • Topic to retrieve data from

  • Auth Mechanism (SASL/PLAINTEXT)

  • Profiles: sasl, mtls, oauth, aws, eventhub

  • SASL Authentication:

    • Username

    • Password

  • mTLS Authentication:

    • Certificate file

    • Certificate password

  • oAuth Authentication (Generic, EventHub, AWS):

    • Client Id

    • Client Secret

    • Specific details for provider

Capabilities:

  • Supports OAuth Authentication (generic, Azure EventHub, AWS MKS)

  • Supports usage of Mtls certificates

  • Supports creation of Kafka headers from event metadata

Metadata Attachments:

Field

Name

Type

Description

content-type

Content Type

String

Data content type from the CloudEvent (if present)

Kafka Headers

Custom Extensions

String

All CloudEvent extension properties are included as Kafka headers with string values

All CloudEvent extension properties are automatically converted to string values and added as individual Kafka headers. The message payload is compressed before being sent to the specified Kafka topic.

Required Configuration:

  • Broker URL (e.g. mqtt://my-mqtt-broker:1883)

  • Topics, QoS and retained message

  • Retry configuration

  • SASL Authentication:

    • Username

    • Password

  • mTLS Authentication:

    • Certificate file

    • Certificate password

  • Event Hub Mode Authentication:

    • Azure Tenant Id

    • Azure Client Id

    • Azure Client Secret

Capabilities:

  • Supports persistent sessions

  • Supports QoS level 0, 1, 2

  • Supports MQTT v3.1.1 and v5.0

  • Scaling: limited to one application instance per source

Metadata Attachments:

Field

Name

Type

Description

content-type

Content Type

String

Data content type from the CloudEvent (if present)

MQTT User Properties

Custom Extensions

String

All CloudEvent extension properties are included as MQTT user properties with string values

All CloudEvent extension properties are automatically converted to string values and added as individual MQTT user properties. The message payload is compressed before being published to the specified MQTT topic with configurable QoS and retention settings.