Spark—结构化流Structured Streaming编程指南-Streaming Query_spark structured streaming foreachbatch-程序员宅基地

技术标签: spark  big data  大数据  

定义了最终结果DataFrame/Dataset之后,剩下的就是开始流计算了,为此,必须使用Dataset.writeStream()方法返回的DataStreamWriter。而且必须在这个接口中指定一个或多个以下内容:

1.输出接收器的详细信息:数据格式、位置等。

2.输出模式:指定写入输出接收器的内容。

3.查询名称:可选,为标识指定查询的唯一名称。

4.触发间隔:可选,指定触发间隔。如果没有指定,系统将在之前的处理完成后立即检查新数据的可用性。如果由于之前的处理没有完成而错过了触发时间,那么系统将立即触发处理。

5.检查点位置:对于一些可以保证端到端容错的输出接收器,指定系统将写入所有检查点信息的位置。这应该是一个hdfs兼容的容错文件系统中的目录。

输出模式
输出模式有以下几种类型:

1.Append模式(默认)——这是默认模式,在这种模式下,只有在最后一个触发器之后添加到结果表中的新行才会输出到接收器。只适用于那些添加到结果表中的行永远不会更改的查询。因此,这种模式保证每行只输出一次。例如,只有select、where、map、flatMap、filter、join等的查询将支持Append模式。

2.Complete模式——每次触发器执行后都将整个结果表输出到接收器后。

3.Update模式——(从Spark 2.1.1开始可用)只有在最后一个触发器之后更新到结果表中的行才会输出到接收器。

不同类型的流查询支持不同的输出模式。下面是适配的矩阵表。

输出接收器

下面是几种类型的内置输出接收器:

1.文件接收器-----将输出存储到目录

df.writeStream
    .format("parquet")  // 可以是"orc", "json", "csv"等等格式
    .option("path", "path/to/destination/dir")
    .start()

2.Kafka 接收器——将输出发送到Kafka

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "t_user_action")
    .start()

3.Foreach接收器——在输出中的记录上运行任意计算。有关更多细节,请参见后面。 

writeStream
    .foreach(...)
    .start()

4.控制台接收器(console)——每次有触发器时将输出输出到console/stdout。这两种模式都支持Append和Complete输出模式。这应该用于在低数据量上进行调试,因为在每个触发器之后,将收集整个输出并存储在Driver程序的内存中

writeStream
    .format("console")
    .start()

5.内存接收器(memory)——输出作为内存中的表存储在内存中。这两种模式都支持Append和Complete输出模式。整个输出被收集并存储在Driver程序的内存中,这应该用于在低数据量上进行调试。因此,请谨慎使用。 

writeStream
    .format("memory")
    .queryName("tableName")
    .start()

有些接收器不能容错,因为它们不能保证输出的持久性,并且只用于调试目的。请参阅前面博客关于容错语义的部分。以下是Spark中所有接收器的详细信息。

 需要注意的是,必须手动调用start()方法来开始查询的执行,start()会返回一个StreamingQuery对象,这个对象是连续运行执行的句柄。我们可以使用它来管理查询,下面会详细介绍。现在看几个例子。

// ========== 没有聚合操作的DF ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   
 
// 打印新数据到console
noAggDF
  .writeStream
  .format("console")
  .start()
 
// 打印新数据到Parquet文件
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()
 
// ========== 有聚合操作的DF ==========
val aggDF = df.groupBy("device").count()
 
// 将更新后的聚合打印到console
aggDF
  .writeStream
  .format("console")
  .outputMode("complete") 
  .start()
 
// 将所有聚合保存在内存的表中
aggDF
  .writeStream
  .format("memory")
  .outputMode("complete")
  .queryName("aggregates")    // queryName的值就是Table的名称
  .start()
 
spark.sql("select * from aggregates").show()   // 交互式地查询保存在内存中的表

使用Foreach和ForeachBatch

foreach和foreachBatch操作允许我们对流查询的输出应用任意操作和编写逻辑,他们的用法稍微有些不一样的地方——虽然foreach允许在每一行上定制写逻辑,但是foreachBatch允许在每个微批处理的输出上应用任意操作和定制逻辑

ForeachBatch

foreachBatch(…)允许指定在流查询的每个微批处理的输出数据上执行的函数。从Spark 2.4开始支持这个特性。它接受两个参数:具有微批处理的输出数据的DataFrame或Dataset,以及微批处理的唯一ID。

streamingDF.writeStream
    .foreachBatch((batchDF: DataFrame, batchId: Long) => {
     // Transform and write batchDF 
    }
).start()

使用foreach批处理,可以执行以下操作。

1.重复用现有批处理数据源——对于许多存储系统,可能还没有现成可用的流接收器,但可能已经存在用于批处理查询的数据写入器。使用foreachBatch,可以在每个微批处理的输出上使用批处理数据编写器。

2.写入多个位置——如果想要将流查询的输出写入多个位置,那么只需多次写入输出DataFrame/Dataset即可。但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,我们应该缓存输出DataFrame/Dataset,将其写入多个位置,然后取消缓存。

streamingDF.writeStream
    .foreachBatch ((batchDF: DataFrame, batchId: Long) => {
        batchDF.persist() 
        batchDF.write.format(…).save(…) // 位置1 
        batchDF.write.format(…).save(…) // 位置2 
        batchDF.unpersist()
    }
)

3.应用额外的DataFrame操作——流Dataframe中不支持许多DataFrame和Dataset操作,因为Spark不支持在这些情况下生成增量数据。使用foreachBatch,可以对每个微批处理输出应用非streaming的dataframe的一些算子操作。但是,必须自己考虑执行该操作的端到端语义。

注意事项

默认情况下,foreachBatch只提供至少一次的写保证。但是,我们可以使用提供给该函数的batchId来消除重复的输出,并获得一次准确的保证。

foreachBatch不支持连续处理模式,因为它基本上依赖于流查询的微批处理执行。如果要使用连续模式编写数据,就使用foreach。

Foreach

如果foreachBatch不是一个好的选择(例如,对应的批处理数据写入器不存在,或者是连续处理模式),那么还可以使用foreach来表示定制的写入器逻辑。具体地说,可以将数据写入逻辑分为三种方法来表示:open、process和close。从Spark 2.4开始,foreach可以使用。

在scala中必须继承类ForeachWriter :

streamingDatasetOfString.writeStream
    .foreach( new ForeachWriter[String] {
 
        def open(partitionId: Long, version: Long): Boolean = {
          // 建立连接
        }
 
        def process(record: String): Unit = {
          // 往连接中写入数据
        }
 
        def close(errorOrNull: Throwable): Unit = {
          // 关闭连接
        }
  }
).start()

在启动流查询时,Spark以以下方式调用函数或对象的方法:

1.对象的单一副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。

2.对象必须是可序列化的,因为每个任务将获得所提供对象的一个新的序列化反序列化副本。因此,强烈建议对写入数据进行初始化(例如打开连接或启动事务)是在调用open()方法之后完成的,这意味着任务已经准备好生成数据。

3.如果存在open()方法并且在调用后成功返回(不管返回值如何),那么一定要调用close()方法(如果它存在),除非JVM或Python进程中途崩溃。

4.对应的生命周期:

     a)方法open(partitionId, epochId)被调用。

     b)如果open(…)返回true,那么对于分区和批处理/epoch中的每一行,将调用方法process(row)。

     c)方法close(error)在处理行时被调用,如果出现错误。

注意:open()方法中的partitionId和epochId可用于在故障导致某些输入数据的重新处理时消除生成的数据的重复。这取决于查询的执行模式。如果流查询是以微批处理模式执行的,那么由唯一元组(partition_id, epoch_id)表示的每个分区都保证具有相同的数据。因此,(partition_id, epoch_id)可用于取消重复和/或事务提交数据,并实现一次准确的保证。但是,如果流查询是在连续模式下执行的,那么这一保证不成立,因此不应该用于重复数据删除。

Streaming Queries触发器
流查询的触发器设置定义了流数据处理的时间,该查询是作为具有固定批处理间隔的微批处理查询执行,还是作为连续处理查询执行。下面是支持的不同类型的触发器:

请看代码示例:

import org.apache.spark.sql.streaming.Trigger
 
// 默认触发器 (一次微批处理完成后立即执行下一次)
df.writeStream
  .format("console")
  .start()
 
// 具有2秒微批处理间隔的ProcessingTime触发器
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()
 
// 只执行一次
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()
 
// 具有一秒检查点间隔的连续触发器
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

管理流查询(Managing Streaming Queries)

启动查询时创建的StreamingQuery对象可用于监视和管理查询。下面列出常用的一些管理方法:

val query = df.writeStream.format("console").start()   // 获得StreamingQuery对象
 
query.id          // 获取正在运行的查询的唯一标识符,该标识符在从检查点获取数据重新启动时保持不变
 
query.runId       // 获取查询此次运行的唯一id,该id将在每次启动/重新启动时变更
 
query.name        // 获取自动生成的名称或用户指定的名称
 
query.explain()   // 打印查询的详细说明
 
query.stop()      // 停止查询
 
query.awaitTermination()   // 阻塞查询,直到使用stop()或错误来终止查询
 
query.exception       // 如果查询已被错误终止,则获取异常信息
 
query.recentProgress  // 流查询的最近更新的数组
 
query.lastProgress    // 流查询的最近一次更新

我们可以在一个SparkSession中启动任意数量的查询。它们将同时运行,共享集群资源。还可以使用sparkSession.streams()来获得StreamingQueryManager,它可以用来管理当前运行中的查询。

val spark: SparkSession = ...
 
spark.streams.active    // 取当前运行中的流查询的列表
 
spark.streams.get(id)   // 通过流查询惟一id获取流查询对象
 
spark.streams.awaitAnyTermination()   // 阻塞,直到其中任何一个终止

监控流查询(Monitoring Streaming Queries)
有多种方法可以监视运行中的流查询。我们可以使用Spark的Dropwizard指标将指标推送到外部系统,也可以通过编程访问它们。

1.系统指标的直接获取
可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取运行中查询的当前状态和指标。lastProgress()返回一个StreamingQueryProgress对象,它包含关于流的最后一个触发器所进行的更新的所有信息——处理了哪些数据、处理速率、延迟等等。还有streamingQuery.recentProgress,它返回最近几次更新的数组。

此外,streamingQuery.status()返回一个StreamingQueryStatus对象,它提供了关于查询正在执行的操作的信息——是活动的触发器还是正在处理的数据,等等。
 

val query: StreamingQuery = ...
 
println(query.lastProgress)
 
/* 打印出的数据格式如下
 
{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/
 
 
println(query.status)
 
/*  打印出的数据格式如下
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

2.使用异步api以编程方式报告系统指标
    我们还可以通过给SparkSession设置一个StreamingQueryListener监听器来异步的监听所有的查询,这样在启动和停止查询以及运行中的查询中进行更新时,都会回调监听器的方法,使用方法是自定义一个StreamingQueryListener监听器并通过sparkSession.streams.attachListenter()方法来注册监听器。代码示例:
 

val spark: SparkSession = ...
 
spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})

3.使用Dropwizard报告系统指标

Spark支持使用Dropwizard库报告系统指标。要同时报告结构化流查询的指标,必须启用配置spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")

在启用此配置之后,在SparkSession中启动的所有查询都将通过Dropwizard向已配置的任何接收器(例如Ganglia, Graphite, JMX等)报告指标。 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lehsyh/article/details/124661411

智能推荐

oracle 12c 集群安装后的检查_12c查看crs状态-程序员宅基地

文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态

解决jupyter notebook无法找到虚拟环境的问题_jupyter没有pytorch环境-程序员宅基地

文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境

国内安装scoop的保姆教程_scoop-cn-程序员宅基地

文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn

Element ui colorpicker在Vue中的使用_vue el-color-picker-程序员宅基地

文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker

迅为iTOP-4412精英版之烧写内核移植后的镜像_exynos 4412 刷机-程序员宅基地

文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机

Linux系统配置jdk_linux配置jdk-程序员宅基地

文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk

随便推点

matlab(4):特殊符号的输入_matlab微米怎么输入-程序员宅基地

文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入

C语言程序设计-文件(打开与关闭、顺序、二进制读写)-程序员宅基地

文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。‍ Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。

Touchdesigner自学笔记之三_touchdesigner怎么让一个模型跟着鼠标移动-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动

【附源码】基于java的校园停车场管理系统的设计与实现61m0e9计算机毕设SSM_基于java技术的停车场管理系统实现与设计-程序员宅基地

文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计

Android系统播放器MediaPlayer源码分析_android多媒体播放源码分析 时序图-程序员宅基地

文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;amp;gt;Jni-&amp;amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图

java 数据结构与算法 ——快速排序法-程序员宅基地

文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法