Connector Catalog#
Connector Components in DIANA System#
Metadata Flow Through the Pipeline#
The Diana platform uses comprehensive metadata to track and manage data as it flows through the integration pipeline. Events receive metadata headers at multiple stages of processing, creating a rich context for data lineage, routing decisions, filtering operations, and processing logic. When an event enters the Diana system, it accumulates metadata headers at different stages.
Source Stage: Initial headers are attached when data is ingested from source connectors
Transformation Stage: Additional headers may be added during data transformation processes, and existing metadata may be modified based on transformation logic
Sink Stage: Final metadata is applied before data reaches its destination
This layered approach ensures complete traceability and context preservation throughout the data journey, while enabling sophisticated routing and filtering capabilities.
Data Sources#
Supported Infrastructure |
Interface |
Capabilities |
|---|---|---|
HTTP clients (Basic Auth/OAuth) |
Data Ingest API (Data Push) |
Generic ingest, metadata via HTTP header, max 15MB/upload. API documentation |
Azure Blob Storage Account |
Azure Blob Store |
Supports flat files, Data Lake Gen2, deletes files after ingest |
Active MQ, IBM MQ, Rabbit MQ, Azure Event Hub, Solace |
AMQP |
AMQP 1.0, multiple queues, event metadata |
Apache Pulsar |
Pulsar |
Multiple topics, event metadata |
Azure Event Hub Kafka Mode, AWS MKS Kafka Mode, Native Kafka Broker |
Kafka |
Kafka headers, event metadata |
MQTT |
MQTT |
MQTT 3.1.1/5.0, multiple topics, QoS, auto reconnect |
FTP server |
FTP |
SFTP file discovery, mtime requirements |
GeoTab API |
Geotab |
Pulls Geotab feeds, keeps state in file system |
Details#
Metadata fields present on events entering the system.
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Ingest ID |
String |
Unique identifier for the ingestion process. |
|
Ingest Timestamp |
String/OffsetDateTime |
Timestamp when the event was ingested into the system. Auto-generated if not present |
|
Data Size |
String/Integer |
Size of the event payload in bytes. Calculated from actual data if not present |
|
Producer Tenant |
String |
Tenant identifier of the event producer. Set from configuration |
|
Data Stream ID |
String |
Identifier for the data stream. Set from sidecar configuration |
|
MD5 Hash |
String |
MD5 hash of the original message payload calculated before compression or transformation, providing data integrity verification |
Note
Default metadata values are only set when fields are missing, preserving existing metadata through multiple processing stages while ensuring all mandatory fields are present for downstream components.
Required Configuration:
Broker Url (e.g.
amqp://my-amqp-broker.amqp:5672)Topic(s) for data retrieval
Password Authentication:
Username
Password
Certificate Authentication:
PKCS#12 or PEM certificate file
Certificate password
Capabilities:
Supports protocol
AMQP 1.0Can read from multiple queues of the same broker
Supports AMQP headers as event meta data
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Queue |
String |
Name of the AMQP queue from which message was consumed |
Custom Extenstions |
Custom JMS Properties |
Various |
Application-defined message properties (excluding |
|
Content Type |
String |
|
System properties with JMSX prefix are filtered out to maintain clean metadata. Properties with null values are excluded.
Required Configuration:
Bootstrap Server URL (e.g.
my-kafka-cluster.kafka:9092)Topic(s) for data retrieval
Auth Mechanism (SASL/PLAINTEXT)
Profiles: sasl, mtls, oauth, aws, eventhub
SASL Authentication:
Username
Password
mTLS Authentication:
PKCS#12 or PEM certificate file
Certificate password
oAuth Authentication (Generic, EventHub, AWS):
Client Id
Client Secret
Specific details for provider
Capabilities:
Supports OAuth Authentication (generic, Azure EventHub, AWS MKS)
Supports usage of mtls certificates
Supports Kafka header as event metadata
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Topic |
String |
Name of the Kafka topic from which message was consumed |
|
Kafka Key |
String |
Kafka message key (optional, only when key is present) |
Custom Extenstions |
Kafka Headers |
Various |
All Kafka message headers with sanitized values |
|
Content Type |
String |
|
Kafka message key is only included when present in the original message. All Kafka headers are included with values sanitized based on the active profile (standard Kafka vs EventHub).
Required Configuration:
Broker URL (e.g. pulsar://my-pulsar-broker:6650)
Topic(s) for data retrieval
Basic Authentication:
Username
Password
OAuth Authentication:
Client Id
Client Secret
Token URL
Certificate Authentication:
PKCS#12 or PEM certificate file
Certificate password
Capabilities:
Supports usage of mtls certificates
Supports Pulsar header as event metadata
Supports durable subscritions with offset management
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Topic |
String |
Name of the Pulsar topic from which message was consumed |
Custom Extenstions |
Message Properties |
Various |
Application-defined message properties attached to the Pulsar message |
|
Content Type |
String |
|
All custom message properties from the Pulsar message are included in the CloudEvent metadata. No filtering is applied to properties.
Required Configuration:
Container name (e.g.
data)Connection Url (e.g.
storageaccountfoo.microsoft.com)ADSL2 enabled (whether the source should use the Azure Data Lake Gen2 API)
Directory name (e.g.
dir/subdir)Authentication type (Azure AD/Service Principal, Shared Key)
Shared Key Authentication:
Account Name
Account Key
Azure AD/Service Principal Authentication:
Client Id
Client Secret
Tenant Id
Capabilities:
Supports Azure Blob Storage with flat files
Supports Azure Data Lake Gen2 with directories and files
Scaling: limited to one application instance per source
Deletes files in storage account after ingest
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Blob Name |
String |
Name/path of the Azure blob that was processed |
Custom Extenstions |
Blob Metadata |
Various |
User-defined metadata key-value pairs attached to the blob |
|
Content Type |
String |
|
Blob metadata with null values are excluded from CloudEvent metadata. Only user-defined blob metadata properties are included.
Required Configuration:
Broker URL (e.g. mqtt://my-mqtt-broker:1883)
Topics and QoS
Password Authentication:
Username
Password
Certificate Authentication:
PKCS#12 client certificate (base64 encoded)
Certificate password
JKS truststore (base64 encoded)
Truststore password
Retry configuration
Capabilities:
Supports persistent sessions
Supports QoS level 0, 1, 2
Supports MQTT v3.1.1 and v5.0
Scaling: limited to one application instance per source
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Topic |
String |
Name of the MQTT topic from which message was consumed |
Custom Extenstions |
MQTT Headers |
Various |
Application-defined header properties passed to the message processor |
|
Content Type |
String |
|
All custom header properties are included in the CloudEvent metadata. The topic name is always included as a special metadata field.
Required Configuration:
Server Url (e.g.
my-ftp-server.ftp)Server Port (22)
Directory from where to read data (e.g.
upload)Key Fingerprint representing the FTP server
Auth Type (password, publickey, publickey-password)
Password Authentication:
Username
Password
Public Key Authentication:
Public Key
Public Key + Password Authentication:
Public Key
Password
Capabilities:
The discovery of new files requires an agreed handling of files being added to the SFTP directory:
new files are discovered based on the file modification time
the file mtime is updated when file content is changed, but not when the file is copied to the directory
a new file put in the directory must therefore have a more recent file modification than all other files copied before
This leads to the following requirement: Suppose there are two files f1 and f2. If file f2 is copied in the directory after f1, then it must be ensured that f1.mtime < f2.mtime. Also, the files may not have the same mtime.
On the side of a data provider, special considerations are necessary to comply to this requirement. For example, if a file export job fails, a retry of the export must ensure that the file mtime is more recent than all files which have been copied in the meantime.
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
FTP File Name |
String |
Name of the FTP file that was processed |
|
FTP File Path |
String |
Full path of the FTP file on the server |
|
Content Type |
String |
|
FTP file metadata is extracted from the file system properties. File name and path are always included as metadata fields.
Required Configuration:
Result count (e.g. 1000)
Initial version to poll (e.g. 0)
Scheduling rate:
feed (e.g. 60000 in ms)
all (e.g. 50000000 in ms)
Databases:
name (e.g.
rb_internal)server (e.g.
my.geotab.com)
Basic Authentication:
Username
Password
Capabilities:
Supports pulling the following Geotab API feeds: Device, Diagnostic, FaultData, LogRecorder, Status
Supports pulling the following endpoints for all data available: FailureMode
The source will keep a state (version of pulled feed) in the file system. When deployed, this uses the Azure CSI driver through the custom StorageClass hermes-file-storage
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Database |
String |
Name of the Geotab database being polled |
|
User |
String |
Username used for Geotab API authentication |
|
Device ID |
String |
Device identifier (optional, only when specific device data) |
|
Version |
String |
Current poll version from Geotab API response |
|
From Version |
String |
Starting version for the poll request (default: “static”) |
|
Prefix |
String |
Data type class name (e.g., “LogRecord”, “StatusData”) |
|
Content Type |
String |
|
Device ID is only included when processing device-specific data. All other metadata fields are always present. The prefix field contains the simple class name of the data type being processed.
Required Configuration:
Technical documentation: API Documentation
Data Provider needs a tenant and client plus secret to ingest data
OAuth Authentication:
Tenant
Client Id
Client Secret
Capabilities:
Generic data ingest without schemas
Meta data can be attached as HTTP header
Records the client id of the used oauth technical user client as meta data
Max upload limit per request 15MB
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Content Type |
String |
Content type from HTTP request header or “application/octet-stream” as default |
Custom Extenstions |
HTTP Headers |
String |
All HTTP request headers starting with X-METADATA sanitized and added as cloud event extensions |
|
Request URI |
String |
Complete request URI including path and query parameters |
HTTP headers undergo sanitization before being added as extensions. Standard headers like Authorization may be filtered.
Data Sinks#
Supported Infrastructure |
Interface |
Capabilities |
|---|---|---|
Active MQ, IBM MQ, Rabbit MQ, Azure Event Hub, Solace |
AMQP |
AMQP 1.0, compression |
Amazon S3, Minio |
AWS S3 |
Upload to S3, compression |
Azure Blob Store, Azurite Dev Container |
Azure Blob Store |
Upload to blob store, compression |
Any HTTP API/Backend |
HTTP |
POST requests, one endpoint |
IoT Insights data recorder API |
IoT-Insights |
Upload to IoT Insights Project |
Azure Event Hub Kafka Mode, AWS MKS Kafka Mode, Native Kafka Broker |
Kafka |
Kafka headers, event metadata |
MQTT Broker (v3.1.1/v5.0) |
MQTT |
multiple topics, match expressions |
Details#
Metadata fields are present on cloud events leaving processing pipeline to sink.
Metadata Attachments:
Field |
Display Name |
Type |
Description |
|---|---|---|---|
|
Consumer Tenant |
String |
Tenant identifier of the event consumer. |
|
Consumer Pipeline ID |
String |
Identifier for the consumer pipeline. |
Required Configuration:
Broker Url (e.g.
amqp://my-amqp-broker.amqp:5672)Topic to send data to
Password Authentication:
Username
Password
Certificate Authentication:
Certificate file (PKCS#12 or PEM)
Certificate password
Compression on/off (can compress data before sending it)
Capabilities:
Supports protocol
AMQP 1.0
Metadata Attachments:
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
UID |
String |
Randomly generated UUID for message tracking |
|
Message ID |
String |
Diana ingestion ID from the CloudEvent extension metadata |
The AMQP sink compresses the original CloudEvent data before forwarding and generates a unique tracking ID for each message sent to the broker.
Required Configuration:
Endpoint (e.g.
s3.eu-central-1.amazonaws.com)region (e.g.
eu-central-1)URL pathstyle (defines if path-style for the bucket URL should be used)
AWS IAM Authentication:
Access Key
Access Secret
Processor: Compression (gzip/zip/none)
Forwarder configuration:
match configuration (which cloudevent attribute should be matched)
forwarding configuration (bucket, endpoint, filename pattern, duplicate prevention)
Capabilities:
Can upload data to one or different S3 and bucket combinations
Data blob name can be dynamically built from header fields
data can be compressed before uploading it to the S3 bucket
Metadata Attachments: The following metadata is forwarded when uploading files to AWS S3:
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
File Name |
String |
Generated file name based on the configured pattern, compressed if applicable |
|
Content Type |
String |
Data content type from the CloudEvent (e.g., |
|
Content MD5 |
String |
Base64 encoded MD5 hash of the compressed payload for data integrity verification |
S3 object metadata |
Custom Extensions |
Various |
All CloudEvent extension properties are included as S3 object metadata with normalized names |
All CloudEvent extension properties are automatically included in the S3 object metadata. Property names are normalized to comply with S3 metadata naming requirements.
Required Configuration:
Container name (e.g.
data)Connection Url (e.g.
storageaccountfoo.microsoft.com)Blob naming pattern
Duplicate prevention on/off
Shared Key Authentication:
Account Name
Account Key
Azure AD/Service Principal Authentication:
Client Id
Client Secret
Tenant Id
Compression on/off
Capabilities:
Can upload data to one or different storage account and container combinations
Data blob name can be dynamically built from header fields
Data can be compressed before uploading it to the blob store
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Content Type |
String |
Data content type from the CloudEvent (used as blob HTTP header) |
|
Content MD5 |
String |
MD5 hash of the compressed payload for data integrity verification |
blob metadata |
Custom Extensions |
String |
All CloudEvent extension properties are normalized and included as blob metadata with string values |
CloudEvent extension names are normalized using StringNormalizer before being added as blob metadata. When the road signature feature is enabled, additional metadata mappings and static values are automatically included. The blob name is generated using configurable patterns and may be modified based on compression settings.
Required Configuration:
Target backend url (e.g.
http://foo.bar/data)Basic Authentication:
Username
Password
OAuth:
IDP Token URL
Client Id
Client Secret
Scope
Grant Type (
client_credentials)
API Key Authentication:
API Key
C-Technology Update API Authentication:
Auth URL
Token URL
Client Secret
Token Prefetch Seconds
Geotab DIG Authentication:
Additional properties per Geotab DIG specification
Capabilities:
Can upload data to exactly one endpoint
Performs POST requests
The content type is taken over from the originally ingested data
The response is only evaluated to detect errors (4XX, 5XX status codes), response itself is discarded
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Content Type |
String |
Data content type from the CloudEvent or configured content type (defaults to |
Custom Headers |
Custom Extensions |
Various |
Configured HTTP headers using SpEL expressions that can access CloudEvent properties and extensions |
Custom headers are dynamically generated using Spring Expression Language (SpEL) expressions that have access to all CloudEvent properties and extensions. The actual headers sent depend on the sink configuration.
Required Configuration:
IoT Insight data recorder project url (e.g.
https://bosch-iot-insights.com/data-recorder-service/v2/pqc8349)Basic Authentication:
Username
Password
Capabilities:
Can upload data to exactly one IoT Insights Project
Supports basic authentication as provided by IoT Insights
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
X-Metadata |
String |
Comma-separated list of typed CloudEvent extensions and content type with data type annotations |
|
Content MD5 |
String |
Base64 encoded MD5 hash of the compressed payload for data integrity verification |
All CloudEvent extension properties are automatically included in the X-Metadata header with type annotations. Invalid characters (commas and semicolons) in string values are replaced with spaces for header compliance.
Required Configuration:
Bootstrap Server URL (e.g.
my-kafka-cluster.kafka:9092)Topic to retrieve data from
Auth Mechanism (SASL/PLAINTEXT)
Profiles: sasl, mtls, oauth, aws, eventhub
SASL Authentication:
Username
Password
mTLS Authentication:
Certificate file
Certificate password
oAuth Authentication (Generic, EventHub, AWS):
Client Id
Client Secret
Specific details for provider
Capabilities:
Supports OAuth Authentication (generic, Azure EventHub, AWS MKS)
Supports usage of Mtls certificates
Supports creation of Kafka headers from event metadata
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Content Type |
String |
Data content type from the CloudEvent (if present) |
Kafka Headers |
Custom Extensions |
String |
All CloudEvent extension properties are included as Kafka headers with string values |
All CloudEvent extension properties are automatically converted to string values and added as individual Kafka headers. The message payload is compressed before being sent to the specified Kafka topic.
Required Configuration:
Broker URL (e.g. mqtt://my-mqtt-broker:1883)
Topics, QoS and retained message
Retry configuration
SASL Authentication:
Username
Password
mTLS Authentication:
Certificate file
Certificate password
Event Hub Mode Authentication:
Azure Tenant Id
Azure Client Id
Azure Client Secret
Capabilities:
Supports persistent sessions
Supports QoS level 0, 1, 2
Supports MQTT v3.1.1 and v5.0
Scaling: limited to one application instance per source
Metadata Attachments:
Field |
Name |
Type |
Description |
|---|---|---|---|
|
Content Type |
String |
Data content type from the CloudEvent (if present) |
MQTT User Properties |
Custom Extensions |
String |
All CloudEvent extension properties are included as MQTT user properties with string values |
All CloudEvent extension properties are automatically converted to string values and added as individual MQTT user properties. The message payload is compressed before being published to the specified MQTT topic with configurable QoS and retention settings.