小本本介绍
大家好,我是本际云服务器推荐网的小编小本本,今天给大家分享一下关于开源组件Flink中redisSink业务需求的改造方案。

Flink中redisSink业务需求改造分享
由于项目需求,我们在使用Flink生成实时数据后需要立即展示数据,同时实时数据量较大,因此我们使用Redis作为数据库来存储最近一小段时间的数据,以供前台进行实时展示。但是在Flink提供的redisSink包(flink-connector-redis_2.11)使用过程中发现,与业务需求不符合。主要存在以下2个问题。
- Flink提供的包入redis时不能设置失效时间,业务场景希望只保存最新的实时数据一段时间,需要设置失效时间,让数据过期,从而提高资源的利用和性能。
- 业务使用的是Redis集群模式,Flink提供的包支持集群但是没有设置密码的地方,业务的Redis集群都有设置密码的强制要求。
因此,我们对Flink提供RedisSink包进行改造,具体的改造方案如下:
- 增加可以设置失效时间的方法:首先重新定义RedisMapper接口类,增加一个获取失效时间的接口。然后在RedisCommand中增加一个带失效时间的命令SETEX(BasicRedisDataType.STRING)接口。接着在RedisSink的invoke方法中提供获取失效时间的代码,以及SETNX命令的实现。最后,在RedisClusterContainer集群实现该方法,以使其支持带失效时间的Sink方法。
- 增加缺失的参数:关于增加设置密码的方式,我们在JedisClusterConfig中添加了一个密码选项,并提供set方法。然后在初始化的时候增加一个构造方法。最终通过以及调用原生的redis连接包来创建了一个集群对象redis.clients.jedis.JedisCluster,在RedisCommandsContainerBuilder中添加带密码的redis集群创建方式。
为了灵活改造,我们直接将整个jar包拉取了下来,并重新定义了一个属于自己的Sink定制包。
总结
本文主要结合项目实际使用场景,针对flink与redis的Sink包(flink-connector-redis_2.11)不能满足业务需求的问题进行了一系列的改造。我们主要通过提供设置密码的redis集群初始化以及在sink过程中可以指定redis键值的失效时长,最终通过改造的flink-connector-redis在项目flink任务中得到了良好的使用效果。
原创文章,作者:小编小本本,如若转载,请注明出处:https://www.benjiyun.com/yunzhujiyunwei/vps-yunwei/5859.html
