Flink消费Kafka数据并把实时计算的结果导⼊到Redis
1. 完成的场景
在很多⼤数据场景下,要求数据形成数据流的形式进⾏计算和存储。上篇博客介绍了Flink消费Kafka数据实现Wordcount计算,这篇博客需要完成的是将实时计算的结果写到redis。当kafka从其他端获取数据⽴刻到Flink计算,Flink计算完后结果写到Redis,整个过程就像流⽔⼀样形成了数据流的处理
2. 代码
添加第三⽅依赖
org.apache.flink flink-clients_2.11 1.4.0
org.apache.flink
flink-streaming-java_2.11 1.4.0
org.apache.flink flink-java 1.4.0
org.apache.flink
flink-connector-kafka-0.9_2.11 1.4.0
org.apache.flink
flink-connector-redis_2.10 1.1.5
注意这⾥的版本最好统⼀选1.4.0,flink-redis的版本最好选1.1.5,⽤低版本或其他版本会遇到包冲突或者不同包的同⼀类不同等逻辑或者第版本有些类没有等java通⽤的⼀些问题逻辑代码
package com.scn;
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.util.Collector;import java.util.Properties;
public class FilnkCostKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty(\"bootstrap.servers\ properties.setProperty(\"zookeeper.connect\ properties.setProperty(\"group.id\
FlinkKafkaConsumer09 myConsumer = new FlinkKafkaConsumer09(\"test\ DataStream stream = env.addSource(myConsumer);DataStream> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); //实例化Flink和Redis关联类FlinkJedisPoolConfig,设置Redis端⼝FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(\"127.0.0.1\").build(); //实例化RedisSink,并通过flink的addSink的⽅式将flink计算的结果插⼊到redis
counts.addSink(new RedisSink>(conf,new RedisExampleMapper())); env.execute(\"WordCount from Kafka data\"); }public static final class LineSplitter implements FlatMapFunction> { private static final long serialVersionUID = 1L;public void flatMap(String value, Collector> out) { String[] tokens = value.toLowerCase().split(\"\\\\W+\"); for (String token : tokens) { if (token.length() > 0) {out.collect(new Tuple2(token, 1)); }} } }
//指定Redis key并将flink数据类型映射到Redis数据类型
public static final class RedisExampleMapper implements RedisMapper>{ public RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, \"flink\"); }
public String getKeyFromData(Tuple2 data) { return data.f0; }public String getValueFromData(Tuple2 data) { return data.f1.toString(); } }}编写⼀个测试类
package com.scn;
import redis.clients.jedis.Jedis;
public class RedisTest {
public static void main(String args[]){ Jedis jedis=new Jedis(\"127.0.0.1\");
System.out.println(\"Server is running: \" + jedis.ping()); System.out.println(\"result:\"+jedis.hgetAll(\"flink\")); }}
3. 测试
启动Redis服务
redis-server
执⾏FilnkCostKafka main⽅法没有跑出异常信息证明启动没有问题在kafka producer端输出⼀些数据
执⾏测试类RedisTest的main⽅法会输出:
Server is running: PONG
result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2}
可以看到数据已经流到Redis