问题描述
我们正在运行在纱线上的生产流作业中运行自定义指标,例如仪表,仪表。
步骤如下:
对pom.xml的附加依赖
@H_502_7@<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>${flink.version}</version> </dependency>
我们正在使用1.2.1版
然后将仪表添加到MyMapper类。
@H_502_7@import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; import org.apache.flink.metrics.Meter; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .readTextFile("/home/LizardKing/Documents/Power/Prova.csv") .map(new MyMapper()) .writeAsCsv("/home/LizardKing/Results.csv"); JobExecutionResult res = env.execute(); } private static class MyMapper extends RichMapFunction<String, Object> { private transient Meter meter; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); } @Override public Object map(String value) throws Exception { this.meter.markEvent(); return value; } } }
希望这可以帮助 。
解决方法
我为Apache
Flink写了一个非常简单的Java程序,现在我对测量统计信息感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
我知道Flink公开了一些指标:
https://ci.apache.org/projects/flink/flink-docs-
release-1.2/monitoring/metrics.html
但是我不确定如何使用它们来获取我想要的东西。从链接中我已经读到“仪表”可以用来测量平均吞吐量,但是在定义后,我应该如何使用它?