How to deploy for At Least Once Message Delivery?
As introduced in the What is At Least Once Message Delivery, Gearpump has a built in KafkaSource. To get at least once message delivery, users should deploy a Kafka cluster as the offset store along with the Gearpump cluster.
Here's an example to deploy a local Kafka cluster.
-
download the latest Kafka from the official website and extract to a local directory (
$KAFKA_HOME
) -
Boot up the single-node Zookeeper instance packaged with Kafka.
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
-
Start a Kafka broker
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kafka.properties
-
When creating a offset store for
KafkaSource
, set the zookeeper connect string tolocalhost:2181
and broker list tolocalhost:9092
inKafkaStorageFactory
.val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", "localhost:9092") val source = new KafkaSource("topic1", "localhost:2181", offsetStorageFactory)
How to deploy for Exactly Once Message Delivery?
Exactly Once Message Delivery requires both an offset store and a checkpoint store. For the offset store, a Kafka cluster should be deployed as in the previous section. As for the checkpoint store, Gearpump has built-in support for Hadoop file systems, like HDFS. Hence, users should deploy a HDFS cluster alongside the Gearpump cluster.
Here's an example to deploy a local HDFS cluster.
-
download Hadoop 2.6 from the official website and extracts it to a local directory
HADOOP_HOME
-
add following configuration to
$HADOOP_HOME/etc/core-site.xml
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
-
start HDFS
$HADOOP_HOME/sbin/start-dfs.sh
-
When creating a
HadoopCheckpointStore
, set the hadoop configuration as in thecore-site.xml
val hadoopConfig = new Configuration hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000") val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, new FileSizeRotation(1000))