Basic Concepts
DataSource
and DataSink
are the two main concepts Gearpump use to connect with the outside world.
DataSource
DataSource
is the start point of a streaming processing flow.
DataSink
DataSink
is the end point of a streaming processing flow.
Implemented Connectors
DataSource
implemented
Currently, we have following DataSource
supported.
Name | Description |
---|---|
CollectionDataSource |
Convert a collection to a recursive data source. E.g. seq(1, 2, 3) will output 1,2,3,1,2,3... . |
KafkaSource |
Read from Kafka. |
DataSink
implemented
Currently, we have following DataSink
supported.
Name | Description |
---|---|
HBaseSink |
Write the message to HBase. The message to write must be HBase Put or a tuple of (rowKey, family, column, value) . |
KafkaSink |
Write to Kafka. |
Use of Connectors
Use of Kafka connectors
To use Kafka connectors in your application, you first need to add the gearpump-external-kafka
library dependency in your application:
SBT
"io.gearpump" %% "gearpump-external-kafka" % 0.9.0
XML
<dependency>
<groupId>io.gearpump</groupId>
<artifactId>gearpump-external-kafka</artifactId>
<version>0.9.0</version>
</dependency>
This is a simple example to read from Kafka and write it back using KafkaSource
and KafkaSink
. Users can optionally set a CheckpointStoreFactory
such that Kafka offsets are checkpointed and at-least-once message delivery is guaranteed.
Low level API
val appConfig = UserConfig.empty
val props = new Properties
props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
val source = new KafkaSource(sourceTopic, props)
val checkpointStoreFactory = new KafkaStoreFactory(props)
source.setCheckpointStore(checkpointStoreFactory)
val sourceProcessor = DataSourceProcessor(source, sourceNum)
val sink = new KafkaSink(sinkTopic, props)
val sinkProcessor = DataSinkProcessor(sink, sinkNum)
val partitioner = new ShufflePartitioner
val computation = sourceProcessor ~ partitioner ~> sinkProcessor
val app = StreamApplication(appName, Graph(computation), appConfig)
High level API
val props = new Properties
val appName = "KafkaDSL"
props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
val app = StreamApp(appName, context)
if (atLeastOnce) {
val checkpointStoreFactory = new KafkaStoreFactory(props)
KafkaDSL.createAtLeastOnceStream(app, sourceTopic, checkpointStoreFactory, props, sourceNum)
.writeToKafka(sinkTopic, props, sinkNum)
} else {
KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum)
.writeToKafka(sinkTopic, props, sinkNum)
}
In the above example, configurations are set through Java properties and shared by KafkaSource
, KafkaSink
and KafkaCheckpointStoreFactory
.
Their configurations can be defined differently as below.
KafkaSource
configurations
Name | Descriptions | Type | Default |
---|---|---|---|
KafkaConfig.ZOOKEEPER_CONNECT_CONFIG |
Zookeeper connect string for Kafka topics management | String | |
KafkaConfig.CLIENT_ID_CONFIG |
An id string to pass to the server when making requests | String | "" |
KafkaConfig.GROUP_ID_CONFIG |
A string that uniquely identifies a set of consumers within the same consumer group | "" | |
KafkaConfig.FETCH_SLEEP_MS_CONFIG |
The amount of time(ms) to sleep when hitting fetch.threshold | Int | 100 |
KafkaConfig.FETCH_THRESHOLD_CONFIG |
Size of internal queue to keep Kafka messages. Stop fetching and go to sleep when hitting the threshold | Int | 10000 |
KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG |
Partition grouper class to group partitions among source tasks | Class | DefaultPartitionGrouper |
KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG |
Message decoder class to decode raw bytes from Kafka | Class | DefaultMessageDecoder |
KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG |
Timestamp filter class to filter out late messages | Class | DefaultTimeStampFilter |
KafkaSink
configurations
Name | Descriptions | Type | Default |
---|---|---|---|
KafkaConfig.BOOTSTRAP_SERVERS_CONFIG |
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster | String | |
KafkaConfig.CLIENT_ID_CONFIG |
An id string to pass to the server when making requests | String | "" |
KafkaCheckpointStoreFactory
configurations
Name | Descriptions | Type | Default |
---|---|---|---|
KafkaConfig.ZOOKEEPER_CONNECT_CONFIG |
Zookeeper connect string for Kafka topics management | String | |
KafkaConfig.BOOTSTRAP_SERVERS_CONFIG |
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster | String | |
KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX |
Name prefix for checkpoint store | String | "" |
KafkaConfig.REPLICATION_FACTOR |
Replication factor for checkpoint store topic | Int | 1 |
Use of HBaseSink
To use HBaseSink
in your application, you first need to add the gearpump-external-hbase
library dependency in your application:
SBT
"io.gearpump" %% "gearpump-external-hbase" % 0.9.0
XML
<dependency>
<groupId>io.gearpump</groupId>
<artifactId>gearpump-external-hbase</artifactId>
<version>0.9.0</version>
</dependency>
To connect to HBase, you need to provide following info:
- the HBase configuration to tell which HBase service to connect
- the table name (you must create the table yourself, see the HBase documentation)
Then, you can use HBaseSink
in your application:
//create the HBase data sink
val sink = HBaseSink(UserConfig.empty, tableName, HBaseConfiguration.create())
//create Gearpump Processor
val sinkProcessor = DataSinkProcessor(sink, parallelism)
:::scala
//assume stream is a normal `Stream` in DSL
stream.writeToHbase(UserConfig.empty, tableName, parallelism, "write to HBase")
You can tune the connection to HBase via the HBase configuration passed in. If not passed, Gearpump will try to check local classpath to find a valid HBase configuration (hbase-site.xml
).
Attention, due to the issue discussed here you may need to create additional configuration for your HBase sink:
def hadoopConfig = {
val conf = new Configuration()
conf.set("hbase.zookeeper.quorum", "zookeeperHost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf
}
val sink = HBaseSink(UserConfig.empty, tableName, hadoopConfig)
How to implement your own DataSource
To implement your own DataSource
, you need to implement two things:
- The data source itself
- a helper class to easy the usage in a DSL
Implement your own DataSource
You need to implement a class derived from io.gearpump.streaming.transaction.api.TimeReplayableSource
.
Implement DSL helper (Optional)
If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper.
You can refer KafkaDSLUtil
as an example in Gearpump source.
Below is some code snippet from KafkaDSLUtil
:
object KafkaDSLUtil {
def createStream[T](
app: StreamApp,
topics: String,
parallelism: Int,
description: String,
properties: Properties): dsl.Stream[T] = {
app.source[T](new KafkaSource(topics, properties), parallelism, description)
}
}
How to implement your own DataSink
To implement your own DataSink
, you need to implement two things:
- The data sink itself
- a helper class to make it easy use in DSL
Implement your own DataSink
You need to implement a class derived from io.gearpump.streaming.sink.DataSink
.
Implement DSL helper (Optional)
If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper.
You can refer HBaseDSLSink
as an example in Gearpump source.
Below is some code snippet from HBaseDSLSink
:
class HBaseDSLSink[T](stream: Stream[T]) {
def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String): Stream[T] = {
stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description)
}
def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, parallism: Int, description: String): Stream[T] = {
stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description)
}
}
object HBaseDSLSink {
implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
new HBaseDSLSink[T](stream)
}
}