此外,我想要有两个(以及以后)消费者在不同的任务上工作并在不同的机器上运行. (目前我只有一个,需要扩大它)
我们来命名这些任务(只是例子):
FIBONACCI(产生斐波那契数)
> RANDOMBOOKS(生成随机句子来写一本书)
这些任务长达2-3个小时,应平均分配给每个消费者.
每个消费者都可以使用x个并行线程来处理这些任务.
所以我说:(这些数字只是例子,将被变量所取代)
>机器1可以为FIBONACCI消耗3个并行作业,为RANDOMBOOKS消耗5个并行作业
>机器2可以为FIBONACCI消耗7个并行作业,并为RANDOMBOOKS消耗3个并行作业
我该如何实现?
我必须启动每个频道的X线程来听每个消费者?
我什么时候要确认?
我目前只针对一个消费者的方法是:为每个任务启动x线程 – 每个线程都是实现Runnable的Defaultconsumer.在handleDelivery方法中,我调用basicAck(deliveryTag,false)然后执行工作.
进一步:我想把一些任务送给一个特殊的消费者.如何结合上述公平分配实现?
这是我的出版代码
String QUEUE_NAME = "FIBONACCI"; Channel channel = this.clientManager.getRabbitMQConnection().createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,null); channel.basicpublish("",QUEUE_NAME,MessageProperties.BASIC,Control.getBytes(this.getArgument())); channel.close();
这是我的消费者代码
public final class Worker extends DefaultConsumer implements Runnable { @Override public void run() { try { this.getChannel().queueDeclare(this.jobType.toString(),null); this.getChannel().basicConsume(this.jobType.toString(),this); this.getChannel().basicQos(1); } catch (IOException e) { // catch something } while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { Control.getLogger().error("Exception!",e); } } } @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] bytes) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); this.getChannel().basicAck(deliveryTag,false); // Is this right? // Start new Thread for this task with my own ExecutorService } }
在这种情况下,工作人员启动了两次:一次为FIBUNACCI,一次为RANDOMBOOKS
UPDATE
正如答案所述,RabbitMQ不是最好的解决方案,但是Couchbase或MongoDB pull方法将是最好的.我对这些系统很新,有没有人可以向我解释,这将如何实现?
解决方法
>您有一些机器来处理作业,还有一些机器(可能是相同的)创建作业.
>您可以为couchbase中的一个桶中的每个作业创建文档(并将其类型设置为“作业”,或者将其与其中的其他数据进行混合).
>每个作业描述以及要完成的具体命令可能包括创建时间,到期时间(如果有特定的时间到期)和某种生成的工作值.这个工作价值将是任意的单位.
每个消费者的工作都会知道一次可以做多少单位,有多少单位可以使用(因为其他工作人员可能正在工作).
所以一台机器,说有10个工作单位,有6个工作单位,可以查询4个工作单位或更少的工作.
>在couchbase有看法是增量更新的地图/减少工作,我想你只需要这里的地图阶段.您可以编写一个视图,让您可以及时查询系统输入的时间和工作单元的数量.这样就可以得到“4个以上工作单位的工作最晚”.
>这种查询,随着容量的释放,将首先获得最多的工作,虽然你可以获得最大的逾期工作,如果没有,那么最大的未逾期的工作. (“过期”是当前时间与工作到期日之间的差额)
> Couchbase视图允许这样非常复杂的查询.而当它们逐渐更新时,它们并不完全是实时的.因此,你不会找一份工作,而是一份工作候选人名单.
>所以,下一步将是获取候选人列表,并检查第二个位置 – 可能是一个锁文件的膜盒(例如:RAM缓存,非持久性).锁定文件将有多个阶段(在这里,您可以使用CRDT进行一些分区解析逻辑或任何方法最适合您的需求).
>由于这个桶是基于ram的,它比视图更快,并且从总状态的滞后更少.如果没有锁文件,则创建一个状态标志为“临时”的.
>如果另一个工作人员获得相同的工作,并看到该锁定文件,那么它可以跳过该候选人,然后在列表中执行下一个.
> IF,不知何故两名工作人员尝试为同一工作创建锁文件,会有冲突.在冲突的情况下,你可以平静下来.或者您可以有逻辑,每个工作人员更新锁定文件(CRDT分辨率,使得这些幂等于兄弟姐妹可以合并)可能会放置一个随机数或一些优先级数字.
>经过指定的时间(大概是几秒钟),工作人员检查锁定文件,如果没有进行任何种族分辨率更改,它将锁定文件的状态从“临时”更改为“采取”
>然后它更新作业本身的状态为“已采取”或某些这样的,以便在其他工作人员正在寻找可用的作业时不会出现在视图中.
>最后,您需要添加另一个步骤,在进行查询之前,要获得上述我所做的这些求职者,您可以特别查询找到被占用的工作,但涉及的工作已经死亡. (例如:逾期的工作).
知道工作人员何时死亡的一种方法是,放入膜桶中的锁定文件应该有一个到期时间,最终会使其消失.可能这个时间可能很短,工作人员只需触摸它更新过期(这在couchbase API中是支持的)
>如果工作人员死亡,最终其锁定文件将消失,孤立的工作将被标记为“已取消”,但没有锁定文件,这是寻找工作的工作人员可以查找的条件.
因此,总而言之,每个工作人员对孤立的作业进行查询,如果有任何检查,依次查看是否有一个锁定文件,则创建一个,并按照上述正常的锁定协议.如果没有孤立的作业,那么它会查找逾期的作业,并遵循锁定协议.如果没有过期的工作,那么它只需要最旧的工作并遵循锁定协议.
当然,如果您的系统没有“逾期”这样的事情,并且如果及时性并不重要,那么这样做也可以,而不是使用最旧的工作,您可以使用其他方法.
另一种方法可能是创建1-N之间的随机值,其中N是相当大的数字,例如4×工作人员的数量,并且每个作业都被标记为该值.每次工作人员要找工作时,都可以滚动骰子,看看是否有任何这样的工作.如果没有,它会再次这样做,直到找到一个有该号码的工作.这样,而不是多个争夺少数“最旧”或最高优先级的工作的工人,而不是更多的锁争用的可能性,它们将被分散出来.以牺牲时间为代价比FIFO情况更随机.
随机方法也可以应用于需要容纳负载值的情况下(因为单个机器不承担太多的负载),而不是采用最老的候选者,只需要随机选择一个列出可行的工作,并尝试这样做.
编辑添加:
在第12步中,我说“可能会放置一个随机数”,我的意思是,如果工作人员知道优先级(例如:最需要做哪些工作),可以将一个代表这个的数字放入文件中.如果没有“需要”工作的概念,那么他们都可以滚动骰子.他们用骰子的角色来更新这个文件.然后他们都可以看看它,看看其他的滚动.如果他们失去了,那么他们会平息,另一个工作人员知道它有它.这样,您可以解决哪位工作人员无需复杂的协议或协商工作.我假设两个工作人员都在这里碰到相同的锁定文件,它可以使用两个锁定文件和一个查找所有这些文件来实现.如果经过一段时间后,没有一个工人滚过一个更高的数字(新工人想到他的工作会知道别人已经在滚动,所以他们会跳过它)你可以安全地认识你是唯一的工作人员努力工作