Spark实战第二版(涵盖Spark3.0)-第15章. 聚合数据

登峰大数据IP属地: 安徽
字数 2,420阅读 511

15.3 使用UDAFs构建自定义聚合

在前面的小节中,您快速回顾了聚合数据,在简单数据集上执行了聚合操作,并最终处理了真实的数据。在这些操作中,使用了包括max()、avg()和min()在内的标准聚合操作。Spark并没有实现所有可能的数据聚合。

在本节中,您将通过构建自己的聚合函数来扩展Spark。用户定义的聚合函数(UDAFs),可以执行自定义聚合。

想象一下下面的用例:您是一个在线零售商,想要给客户忠诚度积分。每位顾客每订购一件商品可得一分,但每次订购最多可得三分。

解决这个问题的一种方法是在您的order dataframe中添加一个point列并匹配点归属规则,但是您将使用一个聚合函数来解决这个问题(您可以自己轻松地使用point列来解决这个问题)。

图15.6显示了将要使用的数据集。它类似于本章第一节15.1节中所使用的方法。

图15.6 应用自定义UDAF来计算每个客户每个订单获得多少忠诚点

操作的结果是一个客户及其关联点的列表,如下面的清单所示。

#清单15.23客户及其关联积分

+------------+--------+-----+-------------+-----+

| firstName|lastName|state|sum(quantity)|point|

+------------+--------+-----+-------------+-----+| 

Ginni      | Rometty| NY  | 7          | 3  |

|Jean-Georges| Perrin | NC  | 3          | 3  |

| Holden    | Karau  | CA  | 10          | 6  |

|Jean-Georges| Perrin | CA  | 4          | 3  | 

+------------+--------+-----+-------------+-----+

实验

这个实验的代码在net.jgp.books.spark.ch13.lab400_udaf包。该应用程序称为PointsPerOrderApp.java,UDAF代码在PointAttributionUdaf.java中

调用UDAF并不比调用任何聚合函数复杂。有几个步骤:

在Spark会话中使用udf().register()方法注册这个函数。

使用callUDF()函数调用该函数。

下面的清单显示了调用UDAF的过程。

//清单15.24注册和调用UDAF

package net.jgp.books.spark.ch13.lab400_udaf;

import static org.apache.spark.sql.functions.callUDF;

import static org.apache.spark.sql.functions.col;

import static org.apache.spark.sql.functions.sum;

import static org.apache.spark.sql.functions.when;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class PointsPerOrderApp {

    public static void main(String[] args ) {

      PointsPerOrderApp app = new PointsPerOrderApp();

      app .start();

    }

    private void start() {

      SparkSession spark = SparkSession.builder()

            .appName( "Orders loyalty point" )

            .master( "local[*]" )

            .getOrCreate();

      spark

.udf().register( "pointAttribution" , new PointAttributionUdaf());

      Dataset<Row> df = spark .read().format( "csv" )

            .option( "header" , true )

            .option( "inferSchema" , true )

            .load( "data/orders/orders.csv" );

      Dataset<Row> pointDf = df

col( "firstName" ), col( "lastName" ), col( "state" ))

            .agg(

sum( "quantity" ),

callUDF( "pointAttribution" , col( "quantity" )).as( "point" ));

      pointDf .show(20);

    }

}

调用UDAF就像这样简单:

callUDF( "pointAttribution" , col( "quantity" ))

在这种情况下,您的UDAF只接受一个参数,但如果有必要,函数可以接受多个参数。如果UDAF需要更多的参数,只需添加这些参数:将它们添加到您的调用和输入模式中(参见清单15.24)。

在深入代码之前,让我们先理解UDAF的架构。每一行将被处理,结果可以存储在一个聚合缓冲区中(在worker点上)。请注意,缓冲区不必反映传入数据的结构:您将定义它的模式,它可以存储其他元素。图15.7说明了聚合及其结果缓冲区的机制。

图15.7 当你的代码正在分析数据集的每一行时,中间结果可以保存在一个缓冲区中。

现在,让我们看看如何实现UDAF本身。当你的应用程序调用这个函数时,它就是一个函数;然而,当涉及到它的实现时,它是一个完整的类。这个类必须扩展UserDefinedAggregateFunction(在org.apache .spark.sql.expressions包)。

因此,实现UDAF的类必须实现以下方法:

bufferSchema()——定义缓冲区的模式。

dataType()——表示来自聚合函数的数据类型。

deterministic()——当Spark通过分割数据来执行时,它会分别处理数据块并将它们组合在一起。如果UDAF逻辑使结果独立于处理和组合数据的顺序,则UDAF是确定性的。

evaluate()——根据给定的聚合缓冲区计算该UDAF的最终结果。

initialize()——初始化给定的聚合缓冲区。

inputSchema()——描述发送到UDAF的输入的模式。

merge()——合并两个聚合缓冲区并存储更新后的缓冲区值。当我们将两个部分聚合的数据元素合并在一起时,将调用此方法。

update()——用新的输入数据更新给定的聚合缓冲区。每个输入行调用一次此方法。

现在,您拥有了构建UDAF所需的所有元素,如清单15.25所示。注意,这个类继承了UserDefinedAggregateFunction,它实现了Serializable。您将需要定义一个serialVersionUID变量,但最重要的是,该类的每个元素也需要是可序列化的。

//清单15.25关注于UDAF:

inputSchema()方法定义了发送给函数的数据的模式。在本例中,您接收到的是一个整数,表示订单中的原始项数。Spark中的一个模式,你已经用过几次了,是用StructType实现的:

@Override    public StructType inputSchema() {      List<StructField> inputFields = new ArrayList<>();      inputFields .add(            DataTypes.createStructField( "_c0" , DataTypes. IntegerType , true ));      return DataTypes.createStructType( inputFields );     }

bufferSchema()方法定义聚合缓冲区的模式,用于存储中间结果。在本例中,您只需要一列存储整数。对于更复杂的聚合流程,可能需要更多的列。

@OverridepublicStructTypebufferSchema(){List bufferFields =newArrayList<>();      bufferFields .add(DataTypes.createStructField("sum", DataTypes. IntegerType ,true));returnDataTypes.createStructType( bufferFields );    }@OverridepublicDataTypedataType(){returnDataTypes. IntegerType ;    }@Overridepublicbooleandeterministic(){returntrue;    }

很好,initialize()方法初始化内部缓冲区。在这种情况下,由于这是一个相当简单的聚合,缓冲区将被设置为0。

然而,由类履行的契约需要遵循这个基本规则。在两个初始缓冲区上应用merge()方法应该返回初始缓冲区本身;例如,merge(initialBuffer, initialBuffer) = initialBuffer。

@Overridepublicvoidinitialize(MutableAggregationBuffer buffer ){      buffer .update(0,0);    }

该操作发生在update()方法中。您将在这里处理数据。你接收到的缓冲区可能包含数据,也可能不包含数据,所以不能忽略它:在第一次调用中,它将不包含初始化数据以外的数据。然而,在随后的调用中,数据已经在缓冲区中了,所以不应该忽略它:

@Overridepublicvoidupdate(MutableAggregationBuffer buffer , Row input ){...intinitialValue = buffer .getInt(0);intinputValue = input .getInt(0);intoutputValue =0;if( inputValue < MAX_POINT_PER_ORDER ) {        outputValue = inputValue ;}else{        outputValue = MAX_POINT_PER_ORDER ;      }      outputValue += initialValue ;buffer .update(0, outputValue );    }

merge()方法合并两个聚合缓冲区,并将更新后的缓冲区值存储回聚合缓冲区中。在这个场景中,当有两个包含忠诚度点的缓冲区时,只需相加它们:

@Override    public void merge(MutableAggregationBuffer buffer , Row row ) {buffer.update(0,buffer.getInt(0) +row.getInt(0));    }

最后,evaluate()方法根据给定的聚合缓冲区计算这个UDAF的最终结果:

@OverridepublicIntegerevaluate(Row row ){returnrow .getInt(0);    } }

在本节中,您已经使用并构建了自己的用户定义聚合函数,这有点棘手。您遵循的用例是一个简单的忠诚度点归属,但您可以想象其他类型的场景。

如果您有兴趣了解更多关于聚合如何工作的信息,可以在Log4j.properties中激活跟踪日志:

log4j.logger.net.jgp= DEBUG//修改为log4j.logger.net.jgp= TRACE

在下一次执行时,你将得到详细的输出:

...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...pdate(PointAttributionUdaf.java:92):->update(),inputrowhas1args...pdate(PointAttributionUdaf.java:97):->update(0, 1) ...

总结

聚合是一种对数据进行分组的方法,这样您就可以从更高或更宏观的级别查看数据。

Apache Spark可以使用Spark SQL(通过创建一个视图)或dataframe API对dataframe进行聚合。

groupBy()方法等价于SQL GROUP BY语句。

在执行聚合之前,需要准备和清理数据。这些步骤可以通过转换来完成(第12章)。

聚合可以通过groupBy()方法之后链接的方法执行,也可以通过agg()方法内部的静态函数执行。

Spark的聚合可以通过自定义的自定义聚合函数(UDAFs)进行扩展。

一个UDAF必须在你的Spark会话中通过名字注册。

使用callUDF()方法和UDAF名称来调用UDAF。

一个UDAF作为一个类实现,它应该实现几个方法。

使用agg()方法一次对多个列执行聚合。

您可以使用sum()方法和静态函数来计算集合的和。

可以使用avg()方法和静态函数来计算集合的平均值。

可以使用max()方法和静态函数来提取集合的最大值。

可以使用min()方法和静态函数来提取集合的最小值。

其他聚合函数包括许多统计方法,如:approx_count_distinct() , collect_list() , collect_set() , corr() , count() , countDistinct() , covar_pop() , covar_samp() , first() , grouping() , grouping _id() , kurtosis() , last() , mean() , skewness() , stddev() , stddev_pop() , stddev_samp() , sumDistinct() , var_pop() , var_samp() , 和variance()

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
0人点赞
更多精彩内容,就在简书APP
"小礼物走一走,来简书关注我"
还没有人赞赏,支持一下
总资产0共写了3.5W字获得1个赞共13个粉丝

推荐阅读更多精彩内容