Hazelcast Jet

Use Your Data Instantly At ScaleHazelcast Jet is a distributed batch and stream processing system that can do stateful computations over massive amounts of data with consistent low latency.

Hazelcast Jet allows you to write modern Java code that focuses purely on data transformation while it does all the heavy lifting of getting the data flowing and computation running across a cluster of nodes. It supports working with both bounded (batch) and unbounded (streaming) data.

These are some of the concerns Jet handles well:

  • Scale Up and Out: Parallelize a computation across all CPU cores and cluster nodes
  • Auto-Rescale: Scale out to newly added nodes and recover from nodes that left or failed
  • Correctness Guaranteeat-least-once and exactly-once processing in the face of node failures

Jet integrates out of the box with many popular data storage systems such as Apache Kafka, Hadoop, relational databases, message queues and many more.

Jet supports a rich set of data transformations, such as windowed aggregations. For example, if your data is GPS location reports from millions of users, Jet can compute every user’s velocity vector by using a sliding window and just a few lines of code.

Jet also comes with a fully-featured, in-memory key-value store. Use it to cache results, store reference data or as a data source itself.

Build Data Pipelines That Scale

Jet consumes and analyzes millions of events per second or terabytes of data at rest using a unified API. Jet keeps processing data without loss when a node fails, using a replicated in-memory data store. You can add nodes to a live cluster, and they immediately start sharing the computation load.

 

Process Data from Multiple Systems

Jet provides source and sink connectors for text files, Avro, JSON, Parquet, Apache Kafka, Apache Pulsar, ElasticSearch, relational databases including change data capture, Hadoop, Amazon S3, Azure Data Lake and many more. You can read from one and write to another, or aggregate and join the data in-flight.

Sub 10ms Latency at the 99.99th Percentile

Jet’s core execution engine was designed for high throughput and low overhead and latency. In rigorous tests, it stayed within a 10-millisecond 99.99% latency ceiling for windowed stream aggregation. The engine uses coroutines that implement suspendable computation, allowing it to run hundreds of concurrent jobs on a fixed number of threads.

Production-Ready Out of the Box

Jet nodes automatically discover each other to form a cluster, both in a cloud environment and on your laptop. It is lightweight enough to run on a Raspberry Pi. No ZooKeeper or Hadoop cluster required for production.

Use Jet To

Perform Streaming and Batch Analytics

Ingest data from a wide-variety of batch and streaming data sources, perform transforms and stateful computations on it, and write the results to the destination of choice. You can also cache the result set in-memory and serve it directly through thousands of concurrent low-latency queries and fine-grained, key-based access.

React To Real-Time Events

You can instantly react to real-time events with Jet, enriching and applying inference at scale. A single node is capable of windowing and aggregating 100Hz sensor data from 100,000 devices with latencies below 10 milliseconds: that’s 10 million events/second. Jet works with many streaming data sources such as Apache Kafka, Apache Pulsar, or message brokers such as RabbitMQ.

Build Stateful Workflows

Use Jet to build distributed and stateful workflows. Ingest data, denormalize and process it, run a series of distributed computations and cache the intermediate results in queryable memory and finally write the results to your destination of choice.

 

 

Your First Jet Program

Let’s write some data processing code and have Jet run it for us.

Start a Java Project

By now you should have some version of Java (at least 8, recommended is 11 or later) installed. You can get it from the AdoptOpenJDK website. Create a new project targeting your build tool of preference, Maven or Gradle, and add the Jet JAR to your build:

plugins {
    id 'application'
}
group 'org.example'
version '1.0-SNAPSHOT'

repositories.mavenCentral()

dependencies {
    compile 'com.hazelcast.jet:hazelcast-jet:4.5.4'
}

application {
    mainClassName = 'org.example.JetJob'
}
Copy

Write Your Data Pipeline

Unlike some other paradigms you may have met, in the world of distributed stream processing we specify not just the processing steps but also where to pull the data from and where to deliver the results. This means that as soon as you deploy your code, it takes action and starts moving the data through the pipeline.

With this in mind let’s start writing code. Instead of connecting to actual systems we’ll start simple, using generated data as the source and your screen as the sink:

public static void main(String[] args) {
  Pipeline p = Pipeline.create();
  p.readFrom(TestSources.itemStream(10))
   .withoutTimestamps()
   .filter(event -> event.sequence() % 2 == 0)
   .setName("filter out odd numbers")
   .writeTo(Sinks.logger());
}
Copy

itemStream() emits SimpleEvents that have an increasing sequence number. The pipeline we wrote will discard every other event and keep those with an even sequence number.

Start Embedded Jet and Run the Pipeline

To create a single Jet node and submit the job to it, add this code to the bottom of the main method:

JetInstance jet = Jet.newJetInstance();
jet.newJob(p).join();
Copy

And run it either from your IDE or from command line:

gradle run
Copy

It will start a full-featured Jet node right there in the JVM where you call it and submit your pipeline to it. If you were submitting the code to an external Jet cluster, the syntax would be the same because JetInstance can represent both an embedded instance or a remote one via a local proxy object. You’d just call a different method to create the client instance.

Once you submit a job, it has a life of its own. It is not coupled to the client that submitted it, so the client can disconnect without affecting the job. In our simple code we call job.join() so we keep the JVM alive while the job lasts.

The output should look like this:

11:28:24.039 [INFO] [loggerSink#0] (timestamp=11:28:24.000, sequence=0)
11:28:24.246 [INFO] [loggerSink#0] (timestamp=11:28:24.200, sequence=2)
11:28:24.443 [INFO] [loggerSink#0] (timestamp=11:28:24.400, sequence=4)
11:28:24.647 [INFO] [loggerSink#0] (timestamp=11:28:24.600, sequence=6)
11:28:24.846 [INFO] [loggerSink#0] (timestamp=11:28:24.800, sequence=8)
11:28:25.038 [INFO] [loggerSink#0] (timestamp=11:28:25.000, sequence=10)
11:28:25.241 [INFO] [loggerSink#0] (timestamp=11:28:25.200, sequence=12)
11:28:25.443 [INFO] [loggerSink#0] (timestamp=11:28:25.400, sequence=14)
11:28:25.643 [INFO] [loggerSink#0] (timestamp=11:28:25.600, sequence=16)

Continue to the next step to submit this job to a running cluster instead of running embedded instance.

Pros: Stream processing is trend on nowadays. it adds stream processing ability to hazelcast.
Cons: it is great but you must learn it before starting usage how to usage it.

Hazelcast Jet 0.4 Performance Review

Vladimir Schreiner | Jul 11, 2017

High performance has always been a major design goal of Hazelcast Jet. In order to verify various performance aspects of Jet, we’ve introduced the following benchmarks:

  1. Word count, to measure and compare the raw computational power during batch processing.
  2. Trade monitor, which is based on a streaming aggregation and involves windowing and event-time based processing.
  3. 5,000 jobs benchmark which was motivated by use cases where thousands of various data processing feeds run on the same cluster. Parallelism is achieved by sharing the resources among sequential jobs rather than parallelizing one job.

Let’s have a closer look at the results and the reasoning behind Jet’s performance.

Word Count

Word count is the “hello world” of the big data world and we have chosen it to compare the raw power of data processing frameworks. Mapping and aggregation as found in this problem are building blocks of more complex data processing algorithms and word count itself is simple and highly parallelizable, with no particular shortcuts available. It emphasizes the raw throughput and overhead of the framework itself.

The chart with benchmark results shows Jet outperforming Apache Spark and Apache Flink by being able to process more than 2 GB per second (640GB data set, 1M distinct keys). See the complete benchmark (hardware, environment, data sets and configuration used).

Benchmark Results

Green Threads

The key to the performance of Jet is its use of Directed Acyclic Graphs [1] or DAGs, as a task processing model.

With Jet, the number of parallel instances of each vertex (called Processors) can be defined so that we can use all the cores, even in the largest machines. With many cores and execution threads, the key to Hazelcast Jet performance is to smoothly coordinate these with cooperative multi-threading. Hazelcast Jet uses “Green Threads” [2] where cooperative processors run in a loop serviced by the same native thread. This leads to:

  1. Practically zero cost of context switching. There is hardly any logic in the worker thread needed to hand over from one processor to the next.
  2. (Next to) guaranteed core affinity. Processors don’t jump between threads and each thread is highly likely to remain pinned to a core. This means high CPU cache hit rate.
  3. Insight into which processor is ready to run. We can inspect the processor’s input/output queues at very low cost to see whether it can make progress.

Using Embedded Hazelcast IMDG

The benchmark uses the Hadoop Distributed File System as a source and sink. We’ve also added one extra run where Jet reads the data set from a Hazelcast IMap. Of course, it’s not an apples-to-apples comparison, however many Jet users will benefit from the Hazelcast IMDG distributed data structures embedded in Jet. As you see, with this setup Jet is able to process almost 8 GB/sec with data sets that fit into the cluster memory.

Apart from the in-memory approach boost, Jet takes advantage of data locality by reading from local partitions.

Streaming Latency (Trade Monitor)

With the 0.4 release, Jet introduced windowing and event-time processing. Both features made Jet stronger and more useful in the infinite stream processing domain. Low latency at scale is the raison d’être of streaming, so our streaming benchmark was focused on measuring the average framework latency in various conditions (message throughput, window set-ups). Kafka was used as an input data source.

Again, Jet outperformed both Spark Streaming and Flink. See the complete streaming benchmark.

Streaming Word Count - Average Latency (lower is better)

Streaming Word Count - Average Latency (lower is better)

Streaming Word Count - Average Latency (lower is better)

The difference becomes more obvious with larger sliding windows, where Jet’s throughput capacity stays flat with increasing window size. Sliding windows are fixed size windows that can overlap. The window slides in fixed increments as time advances, which is called a sliding step. Tumbling windows, being fixed-size non-overlapping windows, are a special case of sliding windows (the sliding step equals the window size).

Combining and Deducting Frames

The rationale behind performance characteristics of Jet lies in:

  • Efficient windowing storage reducing Java Garbage Collection impact
  • Smooth sliding step handling

Under the hood, Jet uses Frames as the building blocks of sliding windows. A frame covers a section of the stream equivalent to a sliding step. When a record arrives, it is added to the respective frame. For each frame, just the rolling accumulator is stored instead of buffering all the items. When the window is closed, mostly by a time or count limit reached, respective frames are combined and the computation is executed. This provides a trade-off between the smoothness of sliding and the cost of storage/computation.

Frames diagram

Moreover, there is a deduct function to optimize sliding window computations. When windows slide, deduct just removes the trailing frame from the sliding window and adds the new one. This means two operations instead of recomputing the whole sliding window from the underlying frames with every sliding step.

Not every operation can use the rolling accumulator (think about accumulating items for widowed sort, one has to buffer the items) and not every frame can be deducted from the window (max operation for example). However, where applicable, both optimizations provide significant performance boost.

Pricing

  1. Annual

    Jet Enterprise

    Per application instance

    $1.50 M USD

    Jet Enterprise is an application embeddable, stream processing engine designed for fast processing of big data sets.

    Configure

    Up to 5 nodes, includes full stream processing capabilities & elasticity (including fault tolerance)

    IMDG Integration (embedded), all connectors, cluster management & a full security suite.

    Full continuous operations, cloud & virtualization support (RH OpenShift container platform).

  2. Free trial

    Jet Enterprise (Trial Edition)

    For 30 days

    Free

    Jet Enterprise is an application embeddable, stream processing engine designed for fast processing of big data sets.

    Begin free trial

    Up to 5 nodes, includes full stream processing capabilities & elasticity (including fault tolerance)

    IMDG Integration (embedded), all connectors, cluster management & a full security suite.

    Full continuous operations, cloud & virtualization support (RH OpenShift container platform).