结构化数据流-foreachBath操作
我是本际云服务器推荐网的小编小本本,今天给大家分享一些关于结构化数据流中的foreach和foreachBatch操作。

foreachBatch
foreachBatch允许您对流式查询的每个微批的输出数据指定执行的函数,自spark2.4,scala,java,python都支持这一点,他有两个参数,微批数据集和微批的唯一ID。使用foreachBatch可以做以下工作:
- 重用批处理数据源-对于许多存储系统,可能还没有可用的流式接收器,但可能已经存在批处理查询的接收器,所以使用foreachBatch可以再每个微批次输出中使用批接收器。
- 写入多个存储器:可能你需要将流式查询的输出写入到多个位置,则只需多次写入即可,但是这样可能导致重新计算(包括可能重新读取数据)。为了避免重新计算,应该缓存数据集,将其写入多个存储器后再取消缓存,例如:批处理中的许多操作在流式查询中不支持,因为spark不支持在这些情况下生成增量计划,使用foreachBatch可以将流式查询转换成了一个个微批来处理,但是你必须自己考虑执行这些操作端到端语义。
- 注意:默认情况下foreachBatch只提供至少一次写入保证,但是你可以使用batchID作为消除重复的方法,并获得一次写入保证。foreachBatch不适用于连续处理模式,因为他基本上依赖于微批处理,如果以连续模式写入数据,可以使用foreach。
foreach
如果不存在相应的批处理数据接收器,或者不存在连续处理模式,则可以使用foreach来自定义编写器逻辑,你可以将数据写入逻辑分为三个方法,open,process和close。自spark2.4scala,java,python都支持这一点。流查询启动后,spark按以下方式调用函数或对象的方法:此对象的一个副本负责查询中单个任务生成的所有数据,也就是说,一个实例负责处理以分布式方式生成的数据的一个分区。此对象必须是可序列化的,因为每个任务将获取此对象的副本,需要进行反序列化,强烈建议任何初始化一定在调用open()方法之后完成,意味着已经准备好了数据。这些方法的生命周期如下:
- 对于每个分区(包含partition_id),每个微批(包含epoch_id)。open(partitionId,epochId)方法被调用,如果open()方法返回true,则对于分区和微批中的每一行将调用process(),然后调用close(error),在处理时抛出错误(如果有)。
- close()方法被调用(如果有),如果open()方法被调用并返回成功(不管返回true还是false),除非JVM或python进程崩溃。
- spark不保证输出相同,因此无法使用(partitionId,epochId)实现重复数据消除,如果需要对输出执行重复数据消除,请尝试使用foreachBatch。
以上就是关于结构化数据流-foreach和foreachBatch操作的详细介绍,希望对大家有所帮助。
本文作者:潘宗昊
本文来源:IT那活儿(上海新炬王翦团队)
原创文章,作者:小编小本本,如若转载,请注明出处:https://www.benjiyun.com/yunzhujiyunwei/vps-yunwei/6343.html
