我需要以下建议:
我有一个@Scheduled服务方法,它有一个几秒钟的fixedDelay,它可以扫描一个工作队列并在找到任何工作队列时处理合适的工作.在同一个服务中,我有一个方法将工作放入工作队列,我希望这个方法在完成后立即触发队列扫描(因为我确信现在有一些工作要做扫描器)为了避免计划踢的延迟(因为这可能是秒,时间有点关键).
任务执行和Scheaduling子系统的“现在触发”功能将是理想的,也可以在手动启动执行后重置fixedDelay(因为我不希望我的手动执行与计划的冲突).注意:队列中的工作可以来自外部源,因此需要定期扫描.
欢迎任何建议
编辑:
队列存储在基于文档的数据库中,因此本地基于队列的解决方案不合适.
我不太满意的解决方案(不喜欢使用原始线程)会是这样的:
@Service
public class MyProcessingService implements ProcessingService {
Thread worker;
@PostCreate
public void init() {
worker = new Thread() {
boolean ready = false;
private boolean sleep() {
synchronized(this) {
if (ready) {
ready = false;
} else {
try {
wait(2000);
} catch(InterruptedException) {
return false;
}
}
}
return true;
}
public void tickle() {
synchronized(this) {
ready = true;
notify();
}
}
public void run() {
while(!interrupted()) {
if(!sleep()) continue;
scan();
}
}
}
worker.start();
}
@PreDestroy
public void uninit() {
worker.interrup();
}
public void addWork(Work work) {
db.store(work);
worker.tickle();
}
public void scan() {
List
在这种情况下,您需要的是消费者 – 生产者队列.一个或多个生产者放入工作项和消费者的队列将项目从队列中取出并处理它们.你想要的是BlockingQueue.它们可以用于以线程安全的方式解决消费者 – 生产者问题.
您可以拥有一个执行当前@Scheduled方法执行的任务的Runnable
.
public class SomeClass {
private final BlockingQueue
而其他一些Runnable或其他类可能会放入如下项目:
public class WorkCreator {
@Autowired
private SomeClass workerClass;
@Override
public void run() {
// produce work
Work work = new Work();
workerClass.getWorkQueue().offer(work);
}
}
我想这是解决你手头问题的正确方法.您可以拥有多种变体/配置,只需查看java.util.concurrent
包即可.
编辑完问题后更新
即使外部源是数据库,它仍然是生产者 – 消费者问题.每当在db中存储数据时,您都可以调用scan()方法,而scan()方法可以将从db中检索的数据放入BlockingQueue.
解决重置fixedDelay的实际问题
除非您自己处理调度部分,否则实际上这不可能与Java或Spring一起使用.现在还没有触发器功能.如果您可以访问正在执行任务的Runnable,您可以自己调用run()方法.但这与从任何地方自己调用处理方法都是一样的,你并不需要Runnable.
另一种可行的解决方法
private Lock queueLock = new ReentrantLock();
@Scheduled
public void findNewWorkAndProcess() {
if(!queueLock.tryLock()) {
return;
}
try {
doWork();
} finally {
queueLock.unlock();
}
}
void doWork() {
List
插入任何新数据时都会调用newDataInserted().如果计划的执行正在进行中,它将等待它完成然后再进行工作.这里对lock()的调用是阻塞的,因为我们知道数据库中有一些工作,并且在插入工作之前可能已经调用了调度调用.调用以非阻塞方式获取findNewWorkAndProcess()中的锁定,因为如果已经通过newDataInserted方法获取了锁,则意味着不应该执行调度方法.
好吧,你可以随意微调.