PyODPS提供了DataFrameAPI来用类似pandas的接口进行大规模数据分析以及预处理,本文主要介绍如何使用PyODPS执行笛卡尔积的操作。
笛卡尔积最常出现的场景是两两之间需要比较或者运算。以计算地理位置距离为例,假设大表Coordinates1存储目标点经纬度坐标,共有M行数据,小表Coordinates2存储出发点经纬度坐标,共有N行数据,现在需要计算所有离目标点最近的出发点坐标。对于一个目标点来说,我们需要计算所有的出发点到目标点的距离,然后找到最小距离,所以整个中间过程需要产生M*N条数据,也就是一个笛卡尔积问题。
haversine公式首先简单介绍一下背景知识,已知两个地理位置的坐标点的经纬度,求解两点之间的距离可以使用haversine公式,使用Python的表达如下:
defhaversine(lat1,lon1,lat2,lon2):lat2,lon2为位置2的经纬度坐标importnumpyasnpdlon=(lon2-lon1)dlat=(lat2-lat1)a=(dlat/2)**2+((lat1))*((lat2))*(dlon/2)**2c=2*((a))r=6371#usedataframeudfdf1=_table('coordinates1').to_df()df2=_table('coordinates2').to_df()deffunc(collections):importpandasaspdcollection=collections[0]ids=[]latitudes=[]longitudes=[]forrincollection:()()()df=({'id':ids,'latitude':latitudes,'longitude':longitudes})defh(x):df['dis']=haversine(,,,)[df['dis'].idxmin()]['id']returnhdf1[,(func,resources=[df2],axis=1,reduce=True,types='string').rename('min_id')].execute(libraries=['','','',''])在自定义函数中,将表资源通过循环读成pandasDataFrame,利用pandas的loc可以很方便的找到最小值对应的行,从而得到距离最近的出发点id。另外,如果在自定义函数中需要使用到三方包(例如本例中的pandas)可以参考这篇在PyODPSDataFrame自定义函数中使用pandas、scipy和scikit-learn-云栖社区-阿里云。
全局变量
当小表的数据量十分小的时候,我们甚至可以将小表数据作为全局变量在自定义函数中使用。
df1=_table('coordinates1').to_df()df2=_table('coordinates2').to_df()df=_pandas()deffunc(x):df['dis']=haversine(,,,)[df['dis'].idxmin()]['id']df1[,(func,axis=1,reduce=True,types='string').rename('min_id')].execute(libraries=['','','',''])在上传函数的时候,会将函数内使用到的全局变量(上面代码中的df)pickle到UDF中。但是注意这种方式使用场景很局限,因为ODPS的上传的文件资源大小是有限制的,所以数据量太大会导致UDF生成的资源太大从而无法上传,而且这种方式最好保证三方包的客户端与服务端的版本一致,否则很有可能出现序列化的问题,所以建议只在数据量非常小的时候使用。
总结使用PyODPS解决笛卡尔积的问题主要分为两种方式,一种是mapjoin,比较直观,性能好,一般能用mapjoin解决的我们都推荐使用mapjoin,并且最好使用内建函数计算,能到达最高的效率,但是它不够灵活。另一种是使用DataFrame自定义函数,比较灵活,性能相对差一点(可以使用pandas或者numpy获得性能上的提升),通过使用表资源,将小表作为表资源传入DataFrame自定义函数中,从而完成笛卡尔积的操作。