我想写一个Kiba Etl脚本,它有一个从CSV到目标CSV的源,带有一个转换规则列表,其中第二个转换器是一个聚合,其中的操作如选择名称,sum(euro)group by name
Kiba ETL脚本文件
source CsvSource,'users.csv',col_sep: ';',headers: true,header_converters: :symbol transform VerifyFieldsPresence,[:name,:euro] transform AggregateFields,{ sum: :euro,group_by: :name} transform RenameField,from: :euro,to: :total_amount destination CsvDestination,'result.csv',:total_amount]
users.csv
date;euro;name 7/3/2015;10;Jack 7/3/2015;85;Jill 8/3/2015;6;Jack 8/3/2015;12;Jill 9/3/2015;99;Mack
result.csv(预期结果)
total_amount;name 16;Jack 97;Jill 99;Mack
由于etl变换器一次在一行上一个接一个地执行,但我的第二个变换器行为依赖于我无法在传递给transform方法的类中访问它的整个行集合.
transform AggregateFields,group_by: :name }
是否有可能使用kiba gem实现这种行为
先感谢您
解决方法
木巴作者在这里!您可以通过多种方式实现这一目标,主要取决于数据大小和实际需求.这里有几种可能性.
使用Kiba脚本中的变量进行聚合
require 'awesome_print' transform do |r| r[:amount] = BigDecimal.new(r[:amount]) r end total_amounts = Hash.new(0) transform do |r| total_amounts[r[:name]] += r[:amount] r end post_process do # pretty print here,but you could save to a CSV too ap total_amounts end
这是最简单的方法,但这非常灵活.
它会将你的聚合内容保存在内存中,所以这可能是好的或不是,取决于你的场景.请注意,目前Kiba是单线程的(但“Kiba Pro”将是多线程的),因此现在无需为聚合添加锁或使用线程安全结构.
从post_process块调用TextQL
另一种快速简便的聚合方法是首先生成非聚合的CSV文件,然后利用TextQl实际进行聚合,如下所示:
destination CsvSource,'non-aggregated-output.csv',:amount] post_process do query = <<sql select name,/* apparently sqlite has reduced precision,round to 2 for now */ round(sum(amount),2) as total_amount from tbl group by name sql textql('non-aggregated-output.csv',query,'aggregated-output.csv') end
定义了以下助手:
def system!(cmd) raise "Failed to run command #{command}" unless system(command) end def textql(source_file,output_file) system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}" # this one uses csvfix to pretty print the table system! "cat #{output_file} | csvfix ascii_table" end
在进行计算时要小心精度.
编写内存中的聚合目标
这里可以使用的一个有用的技巧是用一个类包装一个给定的目标来进行聚合.这是它的样子:
class InMemoryAggregate def initialize(sum:,group_by:,destination:) @aggregate = Hash.new(0) @sum = sum @group_by = group_by # this relies a bit on the internals of Kiba,but not too much @destination = destination.shift.new(*destination) end def write(row) # do not write,but count here instead @aggregate[row[@group_by]] += row[@sum] end def close # use close to actually do the writing @aggregate.each do |k,v| # reformat BigDecimal additions here value = '%0.2f' % v @destination.write(@group_by => k,@sum => value) end @destination.close end end
你可以用这种方式:
# convert your string into an actual number transform do |r| r[:amount] = BigDecimal.new(r[:amount]) r end destination CsvDestination,'non-aggregated.csv',:amount] destination InMemoryAggregate,sum: :amount,group_by: :name,destination: [ CsvDestination,'aggregated.csv',:amount] ] post_process do system!("cat aggregated.csv | csvfix ascii_table") end
这个版本的好处是你可以将聚合器重用于不同的目的地(比如数据库或其他任何目的地).
请注意,这会将所有聚合保留在内存中,就像第一个版本一样.
插入具有聚合功能的商店
另一种方法(如果你有非常大的卷,特别有用)是将结果数据发送到能够为你聚合数据的东西.它可以是一个常规的sql数据库,Redis,或任何更奇特的东西,然后您可以根据需要进行查询.
正如我所说,实施将在很大程度上取决于您的实际需求.希望你能找到适合你的东西!