java – 处理“重复”任务的线程池

前端之家收集整理的这篇文章主要介绍了java – 处理“重复”任务的线程池前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我希望并行执行一些不同的任务,但有一个概念,即如果一个任务已经排队或正在处理,它将不会被重新排队.我已经阅读了一些 Java API,并提出了下面的代码,这似乎有效.
任何人都可以了解我使用的方法是否是最好的方法.有任何危险(线程安全?)或更好的方法吗?
代码如下:
import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecution implements Runnable {
   String key1;
   String key2;   
   static HashMap<TestExecution,Future<?>> executions = new HashMap<TestExecution,Future<?>>();
   static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
   static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2,5,1,TimeUnit.MINUTES,q);

   public static void main(String[] args) {
      try {
         execute(new TestExecution("A","A"));
         execute(new TestExecution("A","A"));
         execute(new TestExecution("B","B"));
         Thread.sleep(8000);
         execute(new TestExecution("B","B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   static boolean execute(TestExecution e) {
      System.out.println("Handling "+e.key1+":"+e.key2);
      if (executions.containsKey(e)) {
         Future<?> f = (Future<?>) executions.get(e);
         if (f.isDone()) {
            System.out.println("PrevIoUs execution has completed");
            executions.remove(e);
         } else {
            System.out.println("PrevIoUs execution still running");
            return false;
         }         
      }
      else {
         System.out.println("No prevIoUs execution");
      }
      Future<?> f = tpe.submit(e);
      executions.put(e,f);            
      return true;
   }

   public TestExecution(String key1,String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }              
}

跟进以下评论
计划是触发执行任务将由cron调用RESTful Web服务来处理.例如,下面是每天9:30触发的一项任务的设置,以及每两分钟安排一次的任务.

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22

在这种情况下,如果任务key11:key12正在运行,或已经排队等待运行,我不想排队另一个实例.我知道我们还有其他的调度选项,但是我们倾向于将cron用于其他任务,所以我想尝试保留它.

第二次更新.至于回复评论到目前为止,我已重新编写代码,您是否可以评论以下更新解决方案的任何问题?

import java.util.concurrent.LinkedBlockingQueue;

public class TestExecution implements Runnable {
   String key1;
   String key2;      
   static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());

   public static void main(String[] args) {
      try {
         tpe.execute(new TestExecution("A","A"));
         tpe.execute(new TestExecution("A","A"));
         tpe.execute(new TestExecution("B","B"));
         Thread.sleep(8000);
         tpe.execute(new TestExecution("B","B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   public TestExecution(String key1,String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }
}


import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TestThreadPoolExecutor extends ThreadPoolExecutor {
   Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());

   public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {      
      super(2,q);      
   }

   public void execute(Runnable command) {
      if (executions.contains(command)) {
         System.out.println("PrevIoUs execution still running");
         return;
      }
      else {
         System.out.println("No prevIoUs execution");
      }
      super.execute(command);      
      executions.add(command);      
   }

   protected void afterExecute(Runnable r,Throwable t) {
      super.afterExecute(r,t);        
      executions.remove(r);
   }      
}

解决方法

几条评论

>在execute-method中,如果多个线程同时调用方法,则会在读取“executions”(containsKey)和写入(remove或put)之间出现争用条件.您需要将所有调用包装在“synchronized执行”中,这些执行应该是synchronized-block中的原子. (在您的情况下,使方法同步将起作用)http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>您应该使用单例而不是静态(即全局)变量来处理状态

但我真的想了解一下你的设计,了解你想要实现的目标.为什么任务会多次排队等待执行?

猜你在找的Java相关文章