Build a streaming data pipeline using Kafka, Spark Streaming, Cassandra, and MySQL
streaming_data_processing
or any name you would like to identify the project as.
Next, make a new file named docker-compose.yml
in the project directory and copy the following into the file:
spark-defaults.conf
in the project directory, which contains the Spark application configuration properties. We will populate the file later, so for now we can leave it empty.
Last, we will create a new folder spark_script
in the project directory, which will be used to store the Spark application script. For now, we can leave the folder empty.
After you have completed through all the steps, the project structure will now look like this:
docker-compose.yml
file include seven services: zookeeper
, kafka
, spark
, spark-worker
, cassandra
, and mysql
.
For each service, we define the service name and other configurations such as image
, container_name
, ports
, environment
, volumes
, depends_on
, restart
, and command
.
Below is a brief description of each configuration.
ALLOW_ANONYMOUS_LOGIN
to yes so that we can connect to the zookeeper service without authentication.
Note that this is not recommended for production environments. In the provided configuration, the volumes demonstrates the usage of Docker volumes (not bind mounts).
ALLOW_PLAINTEXT_LISTENER=yes
This allows Kafka to accept connections over a plaintext (unencrypted) protocol. By default, Kafka might require secure connections, but setting this variable to “yes” allows plaintext communication.
While enabling plaintext listeners can be convenient for local development or testing, it’s not recommended for production environments due to the lack of encryption. For production, use secure communication protocols like SSL/TLS.
KAFKA_ENABLE_KRAFT=no
KRaft (Kafka Raft Metadata Mode) is a new way of managing metadata without Zookeeper. Setting this to “no” disables KRaft, making Kafka rely on Zookeeper.
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
This variable specifies the hostname and port of the Zookeeper server. Since we are using Docker Compose, we can use the service name zookeeper as the hostname and the default Zookeeper port 2181.
KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29092
This sets up Kafka listeners on two different interfaces: one internal and one external. INTERNAL://:9092 listens on port 9092 for internal traffic, and EXTERNAL://:29092 listens on port 29092 for external traffic.
Here we don’t specify any hostname or IP address for both listeners, which means Kafka will listen on all network interfaces within the specified port.
Use different listeners for internal and external traffic to separate traffic flow. Consider using secure protocols (e.g., SSL) for the external listener to protect data in transit.
KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:29092
These are the addresses Kafka advertises to clients for connecting. Internal clients should connect to kafka:9092, while external clients connect to localhost:29092.
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
The security protocol used for each listener. In our case, we are using the PLAINTEXT protocol for both listeners, indicating that no authentication is required.
KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
The name of the listener used for communication between brokers. Typically the listener for the internal client is used for inter-broker communication.
depends_on
configuration to ensure that the zookeeper service is started before the kafka service.
As of the authors’ experiences, there are several occurences where kafka service stopped unexpectedly, so we should set restart configuration to always to ensure that the kafka service will be restarted automatically if it stops or encounters an error.
SPARK_MODE
under environment configuration.
Besides SPARK_MODE, we also specify the following environment configurations for the spark-worker service:
SPARK_MASTER_URL=spark://spark:7077
– The URL of the Spark master node. Here we use the service name spark as the hostname and the default Spark master port 7077.SPARK_WORKER_MEMORY=1G
– The amount of memory to use for the Spark worker node.SPARK_WORKER_CORES=1
– The number of cores to use for the Spark worker node.spark-defaults.conf
file into the containers by providing a direct mapping between a directory on the host and a directory inside the container. This file contains the configuration properties for the Spark application.
We also mount the spark_script directory into the container of spark service, which will be used to store the Python script that we will submit to the Spark application.
We also add a command configuration to both spark and spark-worker services to install the py4j library, which is required to run the Python script within the Spark application.
The tail -f /dev/null
command is used to keep the container running indefinitely.
MYSQL_ROOT_PASSWORD
to root
for simplicity, which will be used later to connect to the MySQL server using the root
user.
At the end of the docker-compose.yml
file, we specify all the Docker volume names to ensure that the volume names used in the services’ configurations are recognized properly.
trading
by executing the following command:
SimpleStrategy
and a replication factor of one are only suitable for development purposes.
In a production environment with multiple data centers, it is essential to use other replication strategies (i.e. NetworkTopologyStrategy) and replication factors (greather than one) to ensure fault tolerance and high availability.
SimpleStrategy
NetworkTopologyStrategy
real_time_data
in the trading
keyspace by executing the following command:
id
, created_at
, volume
, market_cap
, ticker
, price
, and sector
. The id
column will serve as the primary key for the table, ensuring uniqueness for each entry.
To illustrate, here is an example of how the row in the table will look like:
id | created_at | ticker | sector | volume | market_cap | price |
---|---|---|---|---|---|---|
11 | 2024-08-25 07:04:45.000000+0000 | TMAS.JK | Logistics & Deliveries | 275674 | 4932000 | 550 |
created_at
column are stored in UTC timezone, which is the default format for Cassandra.
docker-compose.yml
file, so enter root as the password.
After entering the password, you will be logged into the MySQL shell. Now, we will create a new database named trading
by executing the following command:
ticker
, company_name
, sector
, and shares
. The ticker acts as the primary key.
Sector will be the column where we aggregate and analyze the data.
Finally, Shares will be used to calculate the market cap if needed. Please note that the data we’ll populate is partially masked (not using actual values) and is for lab purposes only.
aggregated_data
. The aggregated_data table will store aggregated volume data in a microbatch fashion.
It consist of the following attributes: processing_id
, processed_at
, sector
, total_volume
, and date
. The processing_id serves as the primary key for this table.
The processed_at column stores the timestamp when the microbatch is processed.
The date column stores the date of the closing trade. It also has the sector value which will be the grouping key for this table.
The total_volume column stores the total volume of traded stocks for a given microbatch.
Since the data is aggregated in microbatches, there can be multiple rows with the same date and sector, but different processing_id, processed_at, and total_volume values.
test_topic
(for testing purposes) and trading_data
(for storing the raw transactional data). We will create the topics by executing the following commands:
kafka-topics.sh
is a command-line tool that is implemented as a shell script. It is used to create, alter, list, and describe topics in Kafka.
Below are a bried explanation of the flags used in the above commands:
--create
flag is used to create a new topic.--topic
flag is used to specify the name of the topic.--bootstrap-server
flag is used to specify the address and port of the Kafka broker to connect to. We set it to kafka:9092 because in the docker-compose.yml file we set the KAFKA_CFG_ADVERTISED_LISTENERS as INTERNAL://kafka:9092,EXTERNAL://localhost:29092. Note that using localhost:29092 will work as well because the kafka container could connect to both the internal and external listeners.--partitions flag
is used to specify the number of partitions for the topic. The number of partitions determines the parallelism and scalability of data processing in Kafka. In our case, we set the number of partitions to 1 because we are running a single Kafka broker. In a production environment, we would typically have multiple Kafka brokers and we would set the number of partitions to a value greater than 1 to ensure higher throughput and better resource distribution.--replication-factor
flag is used to specify the number of replicas for the topic. A replication factor of 1 is sufficient for our purposes because we are running a single Kafka broker. In a production environment, we would typically have multiple Kafka brokers and we would set the replication factor to a value greater than 1 to ensure fault-tolerance.producer.py
in the project directory and add the following code:
.env
file and load the values from there. Make sure to adjust these configurations depending on the environment variables you set earlier in your docker-compose.yml
file!
It’s also important to note that we are connecting to the Kafka broker from the host machine, so we need to use the hostname and port for the external client.
Next, we define the get_last_id()
function, which is used to retrieve the last ID from the Cassandra table. We will need this function later determine the ID of the next message to be produced when the producer application is started.
profile
table on our MySQL database.
main()
function that is used to start the full Kafka Producer application.
main()
function begins by assigning the value of the first command-line argument (sys.argv[1]
) to the KAFKA_TOPIC
variable, which will serve as the destination Kafka topic for the messages.
The KafkaProducer
object is created using the previously defined KAFKA_BOOTSTRAP_SERVER
. We also specify the value_serializer
parameter to serialize the message value to JSON format.
For the first message to be produced, we retrieve the last ID from the Cassandra table, which is done by calling the get_last_id()
function.
Within the while loop, we continuously generate and send messages to the Kafka topic. An inner loop was also added to iterate through all of the available symbols,
where each of them will append the message
variable with the value returned by the produce_message() function.send()
method of the KafkaProducer object. The while loop will continue indefinitely until the user interrupts the process by pressing Ctrl+C
in the terminal.
When this happens, the producer application will be terminated.
To verify if the producer.py script is working properly, we can do the following steps:
Ctrl+C
in the second terminal.Show full code
spark-defaults.conf
file is bind-mounted to the spark and spark-worker containers but is currently empty. The spark_script
folder is bind mounted to the spark containers, but currently does not contain any files.
In this section, we will populate the spark-defaults.conf file with the necessary configurations and the spark_script folder with the Spark Streaming application script.
First, let’s add the following configurations to the spark-defaults.conf file:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0
– Enables Spark to use Spark Structured Streaming API when reading and writing Kafka topics.org.apache.kafka:kafka-clients:3.4.0
– Enables Spark to interact with Kafka.com.datastax.spark:spark-cassandra-connector_2.12:3.3.0
– Enables Spark to interact with Cassandra database.mysql:mysql-connector-java:8.0.26
– Enables Spark to interact with MySQL database.data_streaming.py
in the spark_script folder.
Let’s get started by adding the necessary imports and configurations by placing the following code inside the data_streaming.py file:
kafka:9092
based on the KAFKA_ADVERTISED_LISTENERS
configuration specified for the internal client in the docker-compose.yml file.
Next, we define the write_to_cassandra
function which will be used to write the raw transactional data to the Cassandra table.
epoch_id
parameter is required for the function passed to the foreachBatch
method in Spark Streaming. It is used to uniquely identify each batch of data processed by Spark Streaming.
We set the mode to append
because we want to append the data to the existing data in the Cassandra table.
Next, we define the write_to_mysql
function which will be used to write the aggregated data to the MySQL table.
created_at
column (which contains the date and time) to a date
column (which contains only the date) using the to_date
function.
Then, we aggregate the volume
data and determine the total volume of all traded stocks within that specific microbatch, by grouping the data based on the date
and sector
.
Note that we include the date
column in the group by clause to ensure that data from different day are not aggregated together. This might not be relevant to Indonesian Stock Market where the market
is not always open, but we’ll keep it there to ensure a correct aggregation. We also add a processed_at
column to the aggregated dataframe to indicate the time when the data is processed, using the current_timestamp function which by default returns the current time in UTC timezone.
Finally the aggregated dataframe is written to the MySQL table using the write.jdbc
method.
Next, we define the signal_handler
function which handle the interruption signal received by the application.
write_to_mysql
, which we will later set to 60 seconds.
Finally, we define the main
function which will be used to define the Spark Streaming application.
Spark-Kafka-Cassandra-MySQL
and configure the Cassandra host and port.
Then, we set the log level to ERROR
to reduce the amount of logs generated to the console.
schema
.
Last, we read the data as string, convert it into JSON, then explode it to flatten the array of JSON objects into individual rows.
writeStream
method.
The first query responsible for writing the raw transactional data to the Cassandra table, while the second query is responsible for writing the aggregated data to the MySQL table.
For the first query, we use the write_to_cassandra
function as the processing logic within the foreachBatch
method.
The outputMode is set to append
, which means that only the newly generated rows since the last trigger will be written to the sink (Cassandra table).
The trigger is set to a processing time interval of 10 seconds, which determines the frequency at which microbatches are processed.
Note that the lowest limit for the trigger interval is 100 milliseconds, however we should consider the underlying infrastructure and capabilities of the system/cluster as well.
Simiarly, for the second query, we use the write_to_mysql
function as the processing logic within the foreachBatch
method.
The outputMode is also set to append
. The trigger is set to a longer processing time interval of 60 seconds, because we want to accumulate more data before performing the aggregation.
signal.signal
function.
This allows us to capture the interrupt signal (Ctrl+C) and perform any neccessary additional processing before terminating the application.
Finally, we call the awaitAnyTermination
method on the SparkSession streams to ensure that the application continues to run and process data until it is explicitly terminated or encounters an error.
Without this method, the program would reach the end of the script and terminate immediately, without giving the streaming queries a chance to process any data.
Now, let’s open a new terminal and run the Spark Streaming application inside the spark container:
producer.py
script in the host machine to produce messages to the trading_data topic:
id | created_at | ticker | sector | volume | market_cap | price |
---|---|---|---|---|---|---|
1 | 2024-08-25 07:04:45.000000+0000 | TMAS.JK | Logistics & Deliveries | 275674 | 4932000 | 550 |
2 | 2024-08-25 07:04:45.000000+0000 | DWGL.JK | Coal | 334567 | 593844 | 540 |
3 | 2024-08-25 07:04:45.000000+0000 | ALTO.JK | Beverages | 132423 | 5642123 | 600 |
docker-compose.yml
file). Then run the following commands:
processing_id | processed_at | sector | total_volume | date |
---|---|---|---|---|
1 | 2024-08-25 07:04:45.000000+0000 | Logistics & Deliveries | 275674123 | 2024-08-25 |
2 | 2024-08-25 07:04:45.000000+0000 | Coal | 334561247 | 2024-08-25 |
3 | 2024-08-25 07:04:45.000000+0000 | Beverages | 132423566 | 2024-08-25 |
view
in MySQL.
View is a virtual table that is derived from the result of a query. It does not store any data, but rather, it is a stored query that can be treated as a table.
View is an excellent choice for our needs as they allow us to reuse the same query multiple times without having to rewrite the query each time.
We will create a view named volume_today
that contains the total trading volume across different sectors on the current day.
sector | total_volume |
---|---|
Machinery | 86189937 |
Healthcare Providers | 42701507 |
Coal | 42613419 |
Beverages | 40638827 |
Logistics & Deliveries | 43049845 |
Construction Materials | 38948489 |
Commercial Services | 40920640 |
Chemicals | 43433985 |
Processed Foods | 40158486 |
dashboard
.
Inside the dashboard folder, we’ll create a file named app.py
which will contain the code for our Streamlit application.
Additionally, we need to create a subfolder within dashboard called .streamlit
. This subfolder will house a configuration file named secrets.toml
for our Streamlit application.
You can accomplish this either through the user interface or by executing the following commands in your project directory:
secrets.toml
file with the credentials required to connect with the database.
app.py
file and add the following lines:
secrets.toml
.
Then, we set the page configuration for our Streamlit application, including the layout and the page title shown in the browser tab.
Next, we define a function called get_view_data
that takes in the name of a view as an argument and returns the data in the view as a dataframe. We also define
get_raw_data
which returns the raw transactional data we store in Cassandra.
ttl
argument specifies the duration for which the query results will be cached. Since our microbatch processing interval for the aggregated data is 60 seconds,
we set the ttl to 60 seconds as well, indicating that the query results will be cached for 60 seconds before a new query is executed. On the other hand, the second function executes
a query to retrieve all the data from our transactional table in Cassandra. Feel free to adjust these functions as needed, you can even further optimize them by specifying a WHERE
clause to avoid
retrieving all data at once.
alt.Chart
function creates a base chart object.mark_bar(color=color)
indicates that the chart is a bar chart with the specified color.encode
function is used to specify the x and y axes of the chart:
x-axis
is mapped to the total_volume
. The title of the x-axis is set to Volume.y-axis
is mapped to the sector
. The sort argument is set to ‘-x’ to sort the y-axis in descending order. The title of the y-axis is set to None.scale=alt.Scale()
.
st.header
, which includes the current date. This title is positioned at the top of the dashboard.
Afterwards, we can then display the plots based on the order we would like to display by calling st.altair_chart()
.
Now, let’s run the Streamlit app and see the dashboard in action. Execute the following command in the project directory: