Streaming With DataWeave In Mule 4

What does DataWeave refer to?

DataWeave, introduced in Mule 4, stands as a robust transformation language. It offers support for diverse data formats, including XML, JSON, and CSV. Through DataWeave, data can be seamlessly transformed from one format to another, filters can be applied, and various other operations can be performed. Notably, DataWeave is equipped with streaming capabilities, a key feature that allows efficient handling of substantial amounts of data. This blog delves into the realm of DataWeave streaming in Mule 4, exploring its applications for handling large datasets with efficiency.

What does DataWeave streaming entail, and how does DataWeave provide support for streaming?


DataWeave streaming is a functionality that enables the processing of extensive data volumes without exhausting memory resources. This feature empowers the analysis of substantial data sets by efficiently breaking them into manageable chunks, eliminating the necessity to load the entire dataset into memory. Instead, the data can be processed as it arrives in smaller segments. This approach minimizes memory consumption, enhances overall performance, and facilitates the efficient management of large volumes of data.

DataWeave streaming proves versatile, applicable to both input and output data scenarios. It can be employed for input data when dealing with sizable files or for output data when generating large files. Additionally, streaming is beneficial in scenarios involving the processing of data sourced from APIs or databases. Leveraging streaming capabilities allows for the efficient handling of substantial data quantities while mitigating memory constraints.

Using the streaming attribute in the input or output component:

We can use the streaming attribute in the input or output component to enable streaming. To enable streaming, we need to set the streaming attribute to true. Here is an example:

<file:read
path="input.txt"
streaming="true" />

In this example, we are using the file: read component to read a file. We have set the streaming attribute to true to enable streaming. This means that the file will be read in chunks, and we can process the data as it arrives.

Turning on deferred mode of our transform message:

Mule 4 requires to turn on deferred mode of our transform message. The Streaming DW can transmit the streaming output data to the following message processor immediately when utilizing the deferred option. DataWeave in Mule can process data more rapidly and with less resource/memory usage, thanks to this behavior. Here is an example:

%dw 2.0
output application/json deferred=true
---
payload map ((value, index) -> {
"index": index,
"value": value
})

})

The fact that not all DataWeave operations enable streaming must be noted. Certain operations, such as sort, groupBy, and distinctBy, can only be utilized only if all the data is loaded into memory. You must load all of the data into memory and process it in a non-streaming way in order to use these methods

Streaming using DataWeave

The non-repeatable stream cannot be consumed more than once. But, we can utilize DataWeave streaming to output the transform message’s result, which we can then use wherever. In this case, we are obtaining the data as a non-repeatable stream from the source and using it in the transform message’s deferred mode. Doing the transformation solely on that specific stream or payload, then sending the data to the next processor or transform message to complete the processing before transferring it to its final location.

When the deferred mode is enabled, the transform message accepts the stream but does not consume it; instead, it processes the stream and passes it on to the subsequent processors.

The way the streams are arriving must be the only way we utilize them; we cannot use them in any other way. By doing this, we may take advantage of the non-repeatable stream’s high degree of optimization.

Types of DataWeave Streaming

When it comes to DataWeave streaming only json, xml, and csv are permitted as data formats. As a result, you may profit from DataWeave streaming anytime our input data is in these formats. To identify the sort of data we have, we must describe the MIME type in the connector.

We must additionally specify the streaming parameter for all three of the parameters and set it to true. Nonetheless, because of the complicated xml structure’s potential for having distinct collections, we also need to set the classpath for xml. Thus, we must specify which specific collection must have streaming enabled in the xml document.

How to enable streaming?

It must be noted that default settings do not permit streaming. To stream data in a format that is supported, you can use the following two configuration properties:
a. For reading source data as a stream, pass the streaming parameter as true and select the Non-repeatable stream option in the connector configuration.

b. It is necessary to set the deferred attribute to true in the subsequent transform message for DataWeave to receive source data as a stream. The deferred writer attribute allows an output stream to be sent immediately to the flow’s subsequent message processor.

What is @StreamCapable()?

1. To ensure that our DW script complies with streaming requirements and the input data is streamable we can use the @Stream Capable() validator. It checks the following conditions:
● The variable must be used only once.
● Negative access of streams which are already passed, such as [-1] should not be allowed.
The chosen data is streamable if all the requirements are satisfied.

2. The DataWeave script must utilize an input directive that specifies the MIME type of the data source, such as input payload application/xml, in order to comply with the @StreamCapable() annotation.

Here are the examples:

No index selector is set for negative access such as [-1] :

The variable can be referenced only once:

DataWeave Streaming DEMO
Case 1: Without Streaming

Here, we are reading the file, transforming it, and writing it into another file. The fileName is defined in the setVariable using queryParameter, and the file is then read using the “File Read Connector.” As we are not sending any parameters, MIME type, or enabling deferred mode, we are not using DataWeave streaming in this case. Instead, we are using the repeatable file store stream, which is a default streaming approach. As a result, the file read payload’s output will not be streamed in this case, thus more processing time.

Output: 
Case 2: Using DataWeave Streaming

In this instance of DataWeave Streaming, we’re declaring the MIME type, adding the parameters, and activating the deferred mode. We’re also utilizing the non-repeatable streaming strategy, which allows us to transmit the stream to the final destination without consuming it.

Output: 

Transform Message:
Output:

Hence, reading a file would take less time if DataWeave streaming were used.

Processing Large Datasets using Batch Jobs with Streaming:
  • The Batch Job component automatically separates source data and puts it into permanent queues, allowing for the dependable processing of huge data volumes.
  • Without streaming, Mule 4 batch jobs may perform less effectively. Instead of waiting for a batch of data to collect before processing, streaming enables data to be processed as it comes in real time.
  • This can increase processing effectiveness and speed while also lowering the amount of memory needed to hold the data being processed.
  • Without streaming, batch tasks must process the whole batch at once, which can increase processing time, use more resources, and decrease overall efficiency.

Conclusion:

In this blog, we have explored DataWeave streaming in Mule 4 and how it can be used to handle large amounts of data efficiently. We have seen how to use streaming for input and output data, and how to configure streaming in Mule 4. We have also seen a proof of concept that demonstrates how to read a large file and transform it using DataWeave streaming. With DataWeave streaming, we can process large amounts of data without running out of memory, which makes it a powerful tool for data transformation and processing.