Table of Contents
Intro
This article provides a practical guideline for unit and integration testing in Apache Flink. Using a financial fraud detection application as an example, we demonstrate how to write effective tests to ensure the correctness of your Flink jobs.
Demo App Overview
The demo application processes a stream of financial transactions for different accounts to detect potentially fraudulent activities, leveraging Kafka topics for streaming input and output data.
Remember that the focus is on demonstrating the testing approach, so I won’t go into too much detail about implementing the Flink pipeline itself.
You can find the full source code on GitHub.
FraudDetectionFunction
The main logic of the application is implemented in the FraudDetectionFunction class as a stateful stream processor. It extends the Flink KeyedProcessFunction
as the inbound stream is partitioned by the account ID so we have all the transactions for a particular account processed in order.
The FraudDetectionFunction
doesn’t interact directly with the execution environment, the Flink setup, or the source and sink connectors. Its primary goal is to implement the business logic when a transaction event arrives. That makes it suitable for unit testing, which we’ll demonstrate soon.
Here’s a snippet of the Flink processElement
function implementation:
The actual rules for identifying a transaction as fraudulent are:
- The current transaction amount should be larger than some value (LARGE_AMOUNT constant).
- The previous transaction should be smaller than some value (SMALL_AMOUNT constant). The reason for this rule is that fraudsters often try to initiate an initial payment with a small amount just to check if any transaction will succeed. If this check passes, they will make an actual transaction with a bigger amount.
- The time interval between the two transactions should be small.
This logic is implemented in the isFraudelent method:
FraudDetector
The FraudDetector builds the Flink data stream pipeline.
A few important bits:
- It accepts the Flink source and sink connectors in the constructor. For the production version of the app, these are both tied to the Kafka topics in the main method, but the design allows us to pass fake source and sink connectors for our tests. We’ll see how this works later.
- The build method initializes the environment. It configures the data streaming pipeline, which starts from the source connector, passes the data through the
FraudDetectionFunction
, and finally outputs potential fraudulent transactions to the sink connector.
Running the App Locally in Docker
To demonstrate the application, let’s run it in Docker locally.
Create the Required Docker Containers
Use this docker-compose file to spin up the required Docker containers for Zookeeper, Kafka, and Flink. Go to the directory with the docker-compose
file and run the command:
docker-compose up -d
Create the Kafka Topics
Next, create the input and output Kafka topics. We’ll use the input topic to push some transactions and the output topic to collect the fraud alerts.
After the containers are up and running, execute the following commands to create the two topics:
docker exec -it transactionsdatastream-kafka-1 kafka-topics --create --topic transactions-input --bootstrap-server kafka:9092
docker exec -it transactionsdatastream-kafka-1 kafka-topics --create --topic transactions-output --bootstrap-server kafka:9092
Deploy the Flink App
Build the FraudDetection
project by going to the root of the project and running the command:
mvn clean package
This will generate a target folder with a fat jar:
Copy the jar
into the Docker jobmanager
container:
docker cp target/flink-fraud-detection-1.0.jar jobmanager:/job.jar
Next, set the required environment variable for the Kafka brokers and run the Flink job:
docker exec -it jobmanager bash -c "export KAFKA_BROKERS=kafka:9092 && flink run -c frauddetection.FraudDetector /job.jar"
Push Data to the Input Kafka Topic
Then, to manually see the behavior of the Flink job, run the following Python script that I’ve prepared to send some Kafka messages to the input topic. For reference, here are the transactions that will be generated:
You can see that the transactions for accounts 6 and 8 should be identified as fraudulent based on the rules defined in a previous section.
A sample output after running the Python script looks like this:
Check Results and Monitor the Application
Now that we have the data loaded, let’s ensure everything works as expected by inspecting the content of the input and output Kafka topics. I’ll use the kafka-console-consumer
tool from the Kafka Docker container.
To read the input messages, run the following command:
docker exec -it transactionsdatastream-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic transactions-input --from-beginning
You should see the messages we just published to the topic:
After these messages flow through the Flink pipeline, the output should contain only the accounts with the fraudulent transactions, which should be accounts 6 and 8:
docker exec -it transactionsdatastream-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic transactions-output --from-beginning
Also, the Flink Docker container exposes the Flink dashboard at localhost:8081
. If you go to that address, you should see the Fraud Detection job is submitted and processing records:
Now that you know how the sample app works, let’s dive into the testing part.
Unit Tests for the Stateful Flink Function
Flink provides a convenient way, in the form of a “test harness,” to unit test stateful stream processors like ours. Check this article for more context.
In our fraud detection application, we’re building unit tests for the FraudDetectionFunction.
The test is implemented in the FraudDetectionFunctionTest class, which uses KeyedOneInputStreamOperatorTestHarness to simulate the Flink streaming environment.
The setup looks like this:
The class includes several fine-grained tests for business logic. For example, here is one of them testing the fraud detection:
Running all the tests, you should see them passing:
Integration Test for the Flink Job
Integration tests in Flink ensure that the end-to-end job runs as expected. Essentially, we are exercising the whole pipeline, starting from reading data from the source connector and asserting the output data in the sink connector is as expected.
I encourage you to review the full implementation of the FraudDetectorTest class, but let’s discuss the main points.
Embedded Flink Cluster for Tests
As stated in the Flink Testing guide, Apache Flink provides a JUnit rule MiniClusterWithClientResource
for testing complete jobs against a local, embedded mini cluster. Such tests are useful because you can extend the test coverage of your application, including data serialization/deserialization, parallelism handling, custom state management, and more.
In our FraudDetectorTest class, we initialize the mini cluster as a class rule so all the tests share the same cluster:
This is important since the startup and shutdown of Flink clusters can be expensive.
Source Implementation for the Integration Test
Recall that the FraudDetector
is accepting the Source
and Sink
in the constructor so mock implementations can be plugged in during testing:
To build a test source data generator, we’re using the built-in DataGeneratorSource class. The generator function is implemented using the createTransactionGenerator method. We are basically producing the same input data as the one we pushed to Kafka via the Python script in a previous section:
Remember, the goal of the integration test is not to test every edge case for the business logic. For this purpose, we have the unit tests for the processing function. With the integration test, we are exercising cross-cutting concerns and the operability of the pipeline as a whole.
Sink Implementation for the Integration Test
For the integration test, we are using the pretty simple TestFraudAlertSink class that contains an inner static class CollectingSinkWriter
with a static variable values
to collect all the received elements in the Flink pipeline output. Via this variable, we retrieve the alerts for fraudulent transactions, making sure they are identified correctly.
You can run the integration test to ensure it passes successfully:
Summary
This article provided a practical guideline for unit and integration testing in Apache Flink, using a fraud detection application as an example. We covered:
- Unit Tests: Ensure the Flink processing function works correctly in isolation by utilizing the built-in
KeyedOneInputStreamOperatorTestHarness
, which helps with Flink state management in our unit tests. - Integration Tests: Verifying the end-to-end job execution using
MiniClusterWithClientResource
to simulate a real Flink cluster. This involves using mock sources and sinks to run the entire data processing pipeline, from transaction ingestion to fraud alert generation, and ensure it functions as expected. This approach tests the integration of all components and their interactions within a realistic environment.
Thanks for reading, and see you next time!