Unit and Integration Testing in Apache Flink

Intro

This article provides a practical guideline for unit and integration testing in Apache Flink. Using a financial fraud detection application as an example, we demonstrate how to write effective tests to ensure the correctness of your Flink jobs.

Demo App Overview

The demo application processes a stream of financial transactions for different accounts to detect potentially fraudulent activities, leveraging Kafka topics for streaming input and output data.

Remember that the focus is on demonstrating the testing approach, so I won’t go into too much detail about implementing the Flink pipeline itself.

You can find the full source code on GitHub.

FraudDetectionFunction

The main logic of the application is implemented in the FraudDetectionFunction class as a stateful stream processor. It extends the Flink KeyedProcessFunction as the inbound stream is partitioned by the account ID so we have all the transactions for a particular account processed in order.

The FraudDetectionFunction doesn’t interact directly with the execution environment, the Flink setup, or the source and sink connectors. Its primary goal is to implement the business logic when a transaction event arrives. That makes it suitable for unit testing, which we’ll demonstrate soon.

Here’s a snippet of the Flink processElement function implementation:

The actual rules for identifying a transaction as fraudulent are:

  1. The current transaction amount should be larger than some value (LARGE_AMOUNT constant).
  2. The previous transaction should be smaller than some value (SMALL_AMOUNT constant). The reason for this rule is that fraudsters often try to initiate an initial payment with a small amount just to check if any transaction will succeed. If this check passes, they will make an actual transaction with a bigger amount.
  3. The time interval between the two transactions should be small.

This logic is implemented in the isFraudelent method:

FraudDetector

The FraudDetector builds the Flink data stream pipeline.

A few important bits:

  • It accepts the Flink source and sink connectors in the constructor. For the production version of the app, these are both tied to the Kafka topics in the main method, but the design allows us to pass fake source and sink connectors for our tests. We’ll see how this works later.
  • The build method initializes the environment. It configures the data streaming pipeline, which starts from the source connector, passes the data through the FraudDetectionFunction, and finally outputs potential fraudulent transactions to the sink connector.

Running the App Locally in Docker

To demonstrate the application, let’s run it in Docker locally.

Create the Required Docker Containers

Use this docker-compose file to spin up the required Docker containers for Zookeeper, Kafka, and Flink. Go to the directory with the docker-compose file and run the command:

docker-compose up -d

Create the Kafka Topics

Next, create the input and output Kafka topics. We’ll use the input topic to push some transactions and the output topic to collect the fraud alerts.

After the containers are up and running, execute the following commands to create the two topics:

docker exec -it transactionsdatastream-kafka-1 kafka-topics --create --topic transactions-input --bootstrap-server kafka:9092
docker exec -it transactionsdatastream-kafka-1 kafka-topics --create --topic transactions-output --bootstrap-server kafka:9092

Deploy the Flink App

Build the FraudDetection project by going to the root of the project and running the command:

mvn clean package

This will generate a target folder with a fat jar:

Copy the jar into the Docker jobmanager container:

docker cp target/flink-fraud-detection-1.0.jar jobmanager:/job.jar

Next, set the required environment variable for the Kafka brokers and run the Flink job:

docker exec -it jobmanager bash -c "export KAFKA_BROKERS=kafka:9092 && flink run -c frauddetection.FraudDetector /job.jar"

Push Data to the Input Kafka Topic

Then, to manually see the behavior of the Flink job, run the following Python script that I’ve prepared to send some Kafka messages to the input topic. For reference, here are the transactions that will be generated:

You can see that the transactions for accounts 6 and 8 should be identified as fraudulent based on the rules defined in a previous section.

A sample output after running the Python script looks like this:

Check Results and Monitor the Application

Now that we have the data loaded, let’s ensure everything works as expected by inspecting the content of the input and output Kafka topics. I’ll use the kafka-console-consumer tool from the Kafka Docker container.

To read the input messages, run the following command:

docker exec -it transactionsdatastream-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic transactions-input --from-beginning

You should see the messages we just published to the topic:

After these messages flow through the Flink pipeline, the output should contain only the accounts with the fraudulent transactions, which should be accounts 6 and 8:

docker exec -it transactionsdatastream-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic transactions-output --from-beginning

Also, the Flink Docker container exposes the Flink dashboard at localhost:8081. If you go to that address, you should see the Fraud Detection job is submitted and processing records:

Now that you know how the sample app works, let’s dive into the testing part.

Unit Tests for the Stateful Flink Function

Flink provides a convenient way, in the form of a “test harness,” to unit test stateful stream processors like ours. Check this article for more context.

In our fraud detection application, we’re building unit tests for the FraudDetectionFunction.

The test is implemented in the FraudDetectionFunctionTest class, which uses KeyedOneInputStreamOperatorTestHarness to simulate the Flink streaming environment.

The setup looks like this:

The class includes several fine-grained tests for business logic. For example, here is one of them testing the fraud detection:

Running all the tests, you should see them passing:

Integration Test for the Flink Job

Integration tests in Flink ensure that the end-to-end job runs as expected. Essentially, we are exercising the whole pipeline, starting from reading data from the source connector and asserting the output data in the sink connector is as expected.

I encourage you to review the full implementation of the FraudDetectorTest class, but let’s discuss the main points.

Embedded Flink Cluster for Tests

As stated in the Flink Testing guide, Apache Flink provides a JUnit rule MiniClusterWithClientResource for testing complete jobs against a local, embedded mini cluster. Such tests are useful because you can extend the test coverage of your application, including data serialization/deserialization, parallelism handling, custom state management, and more.

In our FraudDetectorTest class, we initialize the mini cluster as a class rule so all the tests share the same cluster:

This is important since the startup and shutdown of Flink clusters can be expensive.

Source Implementation for the Integration Test

Recall that the FraudDetector is accepting the Source and Sink in the constructor so mock implementations can be plugged in during testing:

To build a test source data generator, we’re using the built-in DataGeneratorSource class. The generator function is implemented using the createTransactionGenerator method. We are basically producing the same input data as the one we pushed to Kafka via the Python script in a previous section:

Remember, the goal of the integration test is not to test every edge case for the business logic. For this purpose, we have the unit tests for the processing function. With the integration test, we are exercising cross-cutting concerns and the operability of the pipeline as a whole.

Sink Implementation for the Integration Test

For the integration test, we are using the pretty simple TestFraudAlertSink class that contains an inner static class CollectingSinkWriter with a static variable values to collect all the received elements in the Flink pipeline output. Via this variable, we retrieve the alerts for fraudulent transactions, making sure they are identified correctly.

You can run the integration test to ensure it passes successfully:

Summary

This article provided a practical guideline for unit and integration testing in Apache Flink, using a fraud detection application as an example. We covered:

  1. Unit Tests: Ensure the Flink processing function works correctly in isolation by utilizing the built-in KeyedOneInputStreamOperatorTestHarness, which helps with Flink state management in our unit tests.
  2. Integration Tests: Verifying the end-to-end job execution using MiniClusterWithClientResource to simulate a real Flink cluster. This involves using mock sources and sinks to run the entire data processing pipeline, from transaction ingestion to fraud alert generation, and ensure it functions as expected. This approach tests the integration of all components and their interactions within a realistic environment.

Thanks for reading, and see you next time!

Resources

  1. GitHub Repo
  2. Flink DataStream Testing

Site Footer

Subscribe To My Newsletter

Email address