在Apache Flink中,我有一串元组.让我们假设一个非常简单的Tuple1< String>.元组可以在其值字段中具有任意值(例如,“P1”,“P2”等).一组可能的值是有限的,但我不知道预先设定的全部(所以可能有一个’P362′).我想将该元组写入某个输出位置,具体取决于元组中的值.例如我想拥有以下文件结构:
> / output / P1
> / output / P2
在文档中,我只发现写入我事先知道的位置的可能性(例如stream.writeCsv(“/ output / athere”)),但是没有办法让数据的内容决定数据实际结束的位置.
我阅读了关于文档中的输出分割,但是这似乎并没有提供一种方法来将输出重定向到不同的目的地,我想要拥有它(或者我不明白这将如何工作).
这可以用Flink API来完成吗?如果没有,是否有可能有第三方图书馆可以做到这一点,或者我必须自己建立一个这样的事情?
更新
按照Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在将序列化之后将元组写入相应的文件.我把它放在这里供参考,也许它对别人有用:
public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> { private final OutputSelector<IT> outputSelector; private final MapFunction<IT,String> serializationFunction; private final String basePath; Map<String,TextOutputFormat<String>> formats = new HashMap<>(); /** * @param outputSelector the selector which determines into which output(s) a record is written. * @param serializationFunction a function which serializes the record to a string. * @param basePath the base path for writing the records. It will be appended with the output selector. */ public SiftingSinkFunction(OutputSelector<IT> outputSelector,MapFunction<IT,String> serializationFunction,String basePath) { this.outputSelector = outputSelector; this.serializationFunction = serializationFunction; this.basePath = basePath; } @Override public void invoke(IT value) throws Exception { // find out where to write. Iterable<String> selection = outputSelector.select(value); for (String s : selection) { // ensure we have a format for this. TextOutputFormat<String> destination = ensureDestinationExists(s); // then serialize and write. destination.writeRecord(serializationFunction.map(value)); } } private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException { // if we know the destination,we just return the format. if (formats.containsKey(selection)) { return formats.get(selection); } // create a new output format and initialize it from the context. TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath,selection)); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); format.configure(context.getTaskStubParameters()); format.open(context.getIndexOfThisSubtask(),context.getNumberOfParallelSubtasks()); // put it into our map. formats.put(selection,format); return format; } @Override public void close() throws IOException { Exception lastException = null; try { for (TextOutputFormat<String> format : formats.values()) { try { format.close(); } catch (Exception e) { lastException = e; format.tryCleanupOnError(); } } } finally { formats.clear(); } if (lastException != null) { throw new IOException("Close Failed.",lastException); } } }
解决方法
你可以实现一个自定义的接收器.从两者之一继承:
> org.apache.flink.streaming.api.functions.sink.SinkFunction
> org.apache.flink.streaming.api.functions.sink.RichSinkFunction
在你的程序中使用:
stream.addSink(SinkFunction<T> sinkFunction);
而不是stream.writeCsv(“/ output / athere”).