> LHSPipe – (较大尺寸)
> RHSPipes – (可能适合内存的较小尺寸)
Psuedocode如下,此示例涉及两个连接
如果F1DecidingFactor = YES则
使用RHS查找#1 BY(LHSPipe.F1Input = RHS查找#1.Join#F1)加入LHSPipe并设置查找结果(SET LHSPipe.F1Output = Result#F1)
除此以外
SET LHSPipe.F1Output = N / A
相同的逻辑适用于F2计算.
预期产量,
IF-ELSE决定是否加入,这种情况迫使我进入自定义加入操作.
考虑到上述情况,我想去MAP-SIDE加入(保持RHSPipe在MAP任务节点的内存中),我正在考虑以下可能的解决方案,每个都有其优缺点.需要你对这些建议.
选项1:
CoGroup – 我们可以使用带有BufferJoiner的CoGroup和自定义连接(操作)来构建自定义连接逻辑,但是不能确保MAP-SIDE连接.
选项2:
HashJoin – 它确保MAP-SIDE加入,但就我所见,无法使用此方式构建自定义连接.
请更正我的理解,并提出您的意见以处理此要求.
提前致谢.
解决方法
Sudo代码
if F1DecidingFactor == "Yes" then F1Result = ACTUAL_VALUE else F1Result = "N/A"
结果表
|F1#Join|F1#Result|F1#DecidingFactor| | Yes| 0| True| | Yes| 1| False| | No| 0| N/A| | No| 1| N/A|
您也可以通过级联进行上述操作.
之后,您可以进行地图侧的加入.
如果修改较小的数据集是不可能的,那么我有2个选项来解决问题.
选项1
将新的字段添加到您的小管道,这相当于您决定的因素(即F1DecidingFactor_RHS =是).然后将其包含在您的加入条件中.一旦你的加入完成,你将只有这些条件匹配的那些行的值.否则将为空/空白.示例代码:
主班
import cascading.operation.Insert; import cascading.pipe.Each; import cascading.pipe.HashJoin; import cascading.pipe.Pipe; import cascading.pipe.assembly.Discard; import cascading.pipe.joiner.LeftJoin; import cascading.tuple.Fields; public class StackHashJoinTestOption2 { public StackHashJoinTestOption2() { Fields f1Input = new Fields("F1Input"); Fields f2Input = new Fields("F2Input"); Fields f1Join = new Fields("F1Join"); Fields f2Join = new Fields("F2Join"); Fields f1DecidingFactor = new Fields("F1DecidingFactor"); Fields f2DecidingFactor = new Fields("F2DecidingFactor"); Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS"); Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS"); Fields lhsJoinerOne = f1DecidingFactor.append(f1Input); Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input); Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join); Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join); Fields functionFields = new Fields("F1DecidingFactor","F1Output","F2DecidingFactor","F2Output"); // Large Pipe fields : // F1DecidingFactor F1Input F2DecidingFactor F2Input Pipe largePipe = new Pipe("large-pipe"); // Small Pipe 1 Fields : // F1Join F1Result Pipe rhsOne = new Pipe("small-pipe-1"); // New field to small pipe. Expected Fields: // F1Join F1Result F1DecidingFactor_RHS rhsOne = new Each(rhsOne,new Insert(f1DecidingFactorRhs,"Yes"),Fields.ALL); // Small Pipe 2 Fields : // F2Join F2Result Pipe rhsTwo = new Pipe("small-pipe-2"); // New field to small pipe. Expected Fields: // F2Join F2Result F2DecidingFactor_RHS rhsTwo = new Each(rhsTwo,Fields.ALL); // Joining first small pipe. Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS Pipe resultsOne = new HashJoin(largePipe,lhsJoinerOne,rhsOne,rhsJoinerOne,new LeftJoin()); // Joining second small pipe. Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS Pipe resultsTwo = new HashJoin(resultsOne,lhsJoinerTwo,rhsTwo,rhsJoinerTwo,new LeftJoin()); Pipe result = new Each(resultsTwo,functionFields,new TestFunction(),Fields.REPLACE); result = new Discard(result,f1DecidingFactorRhs); result = new Discard(result,f2DecidingFactorRhs); // result Pipe should have expected result } }
选项2
如果要使用默认值而不是null / blank,那么建议先使用默认的Joiners进行HashJoin,然后使用一个函数来更新具有适当值的元组.就像是:
主班
import cascading.pipe.Each; import cascading.pipe.HashJoin; import cascading.pipe.Pipe; import cascading.pipe.joiner.LeftJoin; import cascading.tuple.Fields; public class StackHashJoinTest { public StackHashJoinTest() { Fields f1Input = new Fields("F1Input"); Fields f2Input = new Fields("F2Input"); Fields f1Join = new Fields("F1Join"); Fields f2Join = new Fields("F2Join"); Fields functionFields = new Fields("F1DecidingFactor","F2Output"); // Large Pipe fields : // F1DecidingFactor F1Input F2DecidingFactor F2Input Pipe largePipe = new Pipe("large-pipe"); // Small Pipe 1 Fields : // F1Join F1Result Pipe rhsOne = new Pipe("small-pipe-1"); // Small Pipe 2 Fields : // F2Join F2Result Pipe rhsTwo = new Pipe("small-pipe-2"); // Joining first small pipe. // Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result Pipe resultsOne = new HashJoin(largePipe,f1Input,f1Join,new LeftJoin()); // Joining second small pipe. // Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result Pipe resultsTwo = new HashJoin(resultsOne,f2Input,f2Join,Fields.REPLACE); // result Pipe should have expected result } }
更新功能
import cascading.flow.FlowProcess; import cascading.operation.BaSEOperation; import cascading.operation.Function; import cascading.operation.FunctionCall; import cascading.tuple.Fields; import cascading.tuple.TupleEntry; public class TestFunction extends BaSEOperation<Void> implements Function<Void> { private static final long serialVersionUID = 1L; private static final String DECIDING_FACTOR = "No"; private static final String DEFAULT_VALUE = "N/A"; // Expected Fields: "F1DecidingFactor","F2Output" public TestFunction() { super(Fields.ARGS); } @Override public void operate(@SuppressWarnings("rawtypes") FlowProcess process,FunctionCall<Void> call) { TupleEntry arguments = call.getArguments(); TupleEntry result = new TupleEntry(arguments); if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { result.setString("F1Output",DEFAULT_VALUE); } if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { result.setString("F2Output",DEFAULT_VALUE); } call.getOutputCollector().add(result); } }
参考
> Insert Function
> Custom Function
> HashJoin
这应该可以解决你的问题.让我知道这是否有帮助.