Kafka messages can be exported to and imported from Microsoft Cloud for Financial Services (FSI). This cloud solution offers various components, including a unified customer profile for managing customer data. It also has the capability to store personally identifiable information (PII). Data can flow from Kafka to Azure EventHub, and from there, Logic Apps can synchronize the data to DataVerse, which FSI can then consume. This workflow is illustrated in the diagram below:

To set up this connection, follow the steps below:

1. Sending Events to Azure EventHub

For example, you can use the Python script below to send three simple event messages to Azure EventHub.

import time
import os
import json
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError

# Replace placeholders with your EventHub name and connection string
EVENTHUB_NAME = "REPLACE_WITH_EVENTHUB_NAME"
CONNECTION_STR = "Endpoint=sb://REPLACE_WITH_CONNECTION_STRING.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REPLACE_WITH_SHARED_ACCESS_KEY"

body = json.dumps({"id": "something"})

def send_event_data_batch(producer, i):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(body))
    producer.send_batch(event_data_batch)

producer = EventHubProducerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    eventhub_name=EVENTHUB_NAME
)

start_time = time.time()
with producer:
    for i in range(3):
        send_event_data_batch(producer, i)

print("Sent messages in {} seconds.".format(time.time() - start_time))

Replace the placeholders for the EventHub name and connection string. If the message is sent successfully, you will see output similar to “Sent messages in 1.730254888534546 seconds.”

If you encounter the error “Authentication Put-Token failed. Retries exhausted,” double-check the placeholder values to ensure they are correct.

2. Connecting Azure EventHub to Logic Apps

Navigate to the Azure portal and search for Logic Apps. Create a new one to serve as your automated workflow. EventHub events will act as the trigger, and DataVerse will be the output. Choose “Consumption” as the plan type, which is suitable for entry-level development.

Once your Logic App is created, go to Development Tools and access the Logic App designer. The process involves three steps:

2.1 EventHub Trigger

The first step is to connect to EventHub as the trigger. For development purposes, set the check interval to 3 seconds.

2.2 Initialize Variables

The next step is to parse the message from EventHub. The sample message is:

{
  "id": "something"
}

To extract the value using the key “id,” you can use the following expression:

json(decodeBase64(triggerBody()['ContentData']))['id']

2.3 Add a Row to DataVerse

The final step is to use the database connector to add a new row to the corresponding DataVerse table. If the table doesn’t yet exist, navigate to https://make.powerapps.com/, select DataVerse, and then Tables to create one. Use the variable initialized in step 2 to populate the fields.

Once completed, save the workflow.

3. DataVerse

DataVerse serves as a database for storing data in tables. If the Logic App is successfully triggered when a new event is added, you will see a new row in the DataVerse table.

Finally, once all the data is synced to Azure FSI, you can navigate to the Microsoft Cloud Solution Center at https://solutions.microsoft.com/ to select the component you wish to use. For instance, you can select the Unified Customer Profile to manage customer data.

To launch the Dynamics 365 sandbox, navigate to the Solution Center and click the “Launch” button. The Unified Customer Profile app will display populated sample data.

Feel free to reach out if you have any questions about setting this up. Cheers.