以下是列表中项目的示例:
1 2 1 3 2 4 3 4 3 5 4 1 4 5 4 6 5 6
从上面的示例图将是这样的
在上面的简化示例中,例如节点1的路径是
α[1 – > 2 – > 4 – > 1],[1 – > 2 – > 4 – > 5],[1 – > 2 – > 4 – > 6],[1 – > 3 – > 4 – > 1],
[1 – > 3 – > 4 – > 5],[1 – > 3 – > 4 – > 6]και[1 – > 3 – > 5 – > 6]
因此地图缩小将为节点1输出顶点1,5,6(
(a)每个顶点只能计算一次,并且
(b)我们仅在存在长度为3的圆形路径时包括当前顶点,如示例[1 – > 2 – > 4 – > 1]和[1 – > 3 – > 4 – > 1].
我很失落,因为我相信这需要图论和算法的知识,而且我们没有被教过任何与之相关的知识.
如果有人能给我正确的方向开始,我将不胜感激. (我已经研究过最短路径理论,但我不确定它是否会对这个特定的练习有用)
在此先感谢,并有一个愉快的假期.
编辑
我尝试创建adjucency列表,但我希望输出为“vertexID”“node1 node2 node3 node4 …”我看到在输出文件中我的reducer将每个顶点Id的列表拆分为三对.
例如,如果我有一个连接到Z,X,C,V,B,N,M,G,H,J,K,L的顶点A我将它输出为
Z,C
A V,N
A M,H
A J,L
下面是我的mapper和reducer
public class AdjacentsListDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf); job.setJobName("Test driver"); job.setJarByClass(AdjacentsListDriver.class); String[] arg0 = new GenericOptionsParser(conf,args).getRemainingArgs(); if (arg0.length != 2) { System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>"); System.exit(1); } Path in = new Path(arg0[0]); Path out = new Path(arg0[1]); FileInputFormat.setInputPaths(job,in); FileOutputFormat.setOutputPath(job,out); job.setMapperClass(ListMapper.class); job.setReducerClass(ListReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(),new AdjacentsListDriver(),args); System.exit(res); } } /** * @author George * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop.... * */ public class ListMapper extends Mapper<LongWritable,Text,Text> { private Text vertexID = new Text(); //private LongWritable vertice= new LongWritable(0); private Text vertice=new Text(); public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line,"\n"); StringTokenizer itrInside; //vertice=new LongWritable(Long.valueOf(value.toString()).longValue()); while (itr.hasMoreTokens()) { if(itr.countTokens() > 2){ }//ignore first line ?? else{ itrInside=new StringTokenizer(itr.toString()); vertexID.set(itr.nextToken()); while(itrInside.hasMoreTokens()){ vertice.set(itrInside.nextToken()); context.write(vertexID,value); } } } } } @override public class ListReducer extends Reducer<Text,Text>{ public void reduce(Text key,Iterable<Text> values,InterruptedException { String vertices=""; for (Text value : values) { if(vertices=="") vertices+=value.toString(); else vertices=vertices+","+value.toString(); } Text value=new Text(); value.set(vertices); context.write(key,value); } }
对于每个顶点,您希望正好是n跳的顶点.在查看最短路径算法时,您在正确的路径上,但通过简单的广度优先搜索可以更轻松地解决它.
但是,在使用MapReduce处理图形时,您需要更深入地了解顶点之间的消息传递.图算法通常用多个作业表示,其中map和reduce阶段具有以下赋值:
> Mapper:向另一个顶点发送消息(通常用于顶点的每个邻居)
> Reducer:对传入的消息进行分组,加入核心图并减少它们,有时会发送更多消息.
每个作业将始终按照上一个作业的输出进行操作,直到您达到结果或放弃为止.
数据准备
在您真正想要运行图算法之前,请确保您的数据采用邻接列表的形式.它将使以下迭代算法更容易实现.
因此,您需要按顶点id对它们进行分组,而不是邻接元组.这是一些伪代码:
map input tuple (X,Y): emit (X,Y) reduce input (X,Y[]) : emit (X,Y[])
基本上你只是按顶点id进行分组,所以你的输入数据是它的邻居的一个键(顶点id)(可以从那个特定的键顶点id到达的顶点id列表).如果要节省资源,可以将reducer用作组合器.
算法
就像我已经提到的那样,你只需要一个广度优先的搜索算法.您将为图中的每个顶点执行广度优先搜索算法,当命中邻居时,您将只增加一个跳数计数器,该计数器告诉我们离开头顶点的距离(这是最短路径算法的最简单情况,即当边缘重量为1)时.
让我向您展示一个简单的图片,用简单的图表描述它.橙色意味着访问,蓝色未访问,绿色是我们的结果.括号中是跳数计数器.
你看,在每次迭代中我们都为每个顶点设置一个hopcounter.如果我们点击一个新的顶点,我们只会将它增加一个.如果我们到达第n个顶点,我们将以某种方式将其标记为以后查找.
使用MapReduce进行分发
虽然对每个顶点进行广度优先搜索似乎非常浪费,但我们可以通过并行化来做得更好.消息传递到游戏中.就像上面的图片一样,我们在映射步骤中得到的每个顶点最初都会向包含以下有效负载的邻居发送一条消息:
HopMessage: Origin (VertexID) | HopCounter(Integer)
在第一次迭代中,我们将尝试将消息发送给邻居以启动计算.否则,我们只会代理图表或传入消息.
因此,在数据准备完成后的第一份工作中,map和reduce看起来像这样:
map input (VertexID key,either HopMessage or List<VertexID> adjacents): if(iterationNumber == 1): // only in the first iteration to kick off for neighbour in adjacents: emit (neighbour,new HopMessage(key,0)) emit (key,adjacents or HopMessage) // for joining in the reducer
reducer现在在图形和消息之间进行简单连接,主要是为了获得顶点的邻居,导致输入(采用我的简单图形):
1 2 // graph 2 1 // hop message 2 3 // graph 3 1 // hop message 3 4 // graph 4 1 // hop message 4 - // graph
在reducer步骤中,我们将再次将消息转发给邻居,并在递增后检查跳计数器是否已经达到3.
reducer input(VertexID key,List<either HopMessage or List<VertexID> neighbours> values): for hopMessage in values: hopMessage.counter += 1 if (hopMessage.counter == 3) emit to some external resource (HopMessage.origin,key) else for neighbour in neighbours of key: emit (neighbour,hopMessage) emit (key,neighbours)
正如您所看到的,它可能会变得非常混乱:您需要管理两种不同类型的消息,然后还要写入一些外部资源,这些资源将跟踪正好3跳的顶点.
只要有要发送的HopMessages,就可以安排迭代的作业.这很容易出现图中循环的问题,因为在这种情况下你将无限增加hopcounter.所以我建议到目前为止发送完整的遍历路径与每条消息(非常浪费)或者只是限制要进行的迭代次数.在n = 3的情况下,不需要超过3个作业迭代.
有很多博客和项目可以为您提供有关如何处理Hadoop中每个问题的示例.至少我在我的博客中写过MapReduce中的图形运算,你可以在我的github上找到一些例子.
清理输出数据
最后,您将拥有一堆包含顶点的文件 – >顶点映射.您可以像在准备中一样减少它们.
使用Pregel的Niftier方式
处理图形的一种不那么繁琐的方法是使用表达图形计算的Pregel方法. Pregel正在使用以顶点为中心的模型,使表达这种广度优先计算变得更加容易.
以下是使用Apache Hama的上述算法的简单示例:
public static class NthHopVertex extends Vertex<Text,NullWritable,HopMessage> { @Override public void compute(Iterable<HopMessage> messages) throws IOException { if (getSuperstepCount() == 0L) { HopMessage msg = new HopMessage(); msg.origin = getVertexID().toString(); msg.hopCounter = 0; sendMessageToNeighbors(msg); } else { for (HopMessage message : messages) { message.hopCounter += 1; if (message.hopCounter == 3) { getPeer().write(new Text(message.origin),getVertexID()); voteToHalt(); } else { sendMessageToNeighbors(message); } } } } }
BTW在您的示例中创建的新图形如下所示:
1=[1,6] 2=[2,3,6] 3=[2,6] 4=[4,5]
以下是完整的Hama Graph实现:
https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java