我想在
Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.
程序:
public class DirectoryService { public static void main(String[] args) { FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR"); ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(); List<String> files = pool.invoke(task); pool.shutdown(); System.out.println("Total no of files with hello" + files.size()); } } class FileSearchRecursiveTask extends RecursiveTask<List<String>> { private String path; public FileSearchRecursiveTask(String path) { this.path = path; } @Override protected List<String> compute() { File mainDirectory = new File(path); List<String> filetedFileList = new ArrayList<>(); List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>(); if(mainDirectory.isDirectory()) { System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName()); if(mainDirectory.canRead()) { File[] fileList = mainDirectory.listFiles(); for(File file : fileList) { System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath()); if(file.isDirectory()) { FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath()); recursiveTasks.add(task); task.fork(); } else { if (file.getName().contains("hello")) { System.out.println(file.getName()); filetedFileList.add(file.getName()); } } } } for(FileSearchRecursiveTask task : recursiveTasks) { filetedFileList.addAll(task.join()); } } return filetedFileList; } }
当目录没有太多的子目录和文件时,这个程序工作正常,但如果它真的很大,那么抛出OutOfMemoryError.
我的理解是最大线程数(包括补偿线程)是有界限的,为什么它们是这个错误?我在程序中遗漏了什么吗?
Caused by: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486) at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020) at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057) at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390) at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719) at FileSearchRecursiveTask.compute(DirectoryService.java:51) at FileSearchRecursiveTask.compute(DirectoryService.java:20) at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107) at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046) at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390) at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719) at FileSearchRecursiveTask.compute(DirectoryService.java:51) at FileSearchRecursiveTask.compute(DirectoryService.java:20) at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
解决方法
你不应该将新任务分散到所有认可之外.基本上,只要有可能另一个工作线程可以获取分叉作业并在本地进行评估,您应该进行分叉.然后,一旦你分叉了一个任务,就不要在之后调用join().虽然底层框架将启动补偿线程以确保您的作业将继续而不是仅阻止所有线程等待子任务,但这将创建可能超出系统功能的大量线程.
这是您的代码的修订版本:
public class DirectoryService { public static void main(String[] args) { FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR")); List<String> files = task.invoke(); System.out.println("Total no of files with hello " + files.size()); } } class FileSearchRecursiveTask extends RecursiveTask<List<String>> { private static final int TARGET_SURPLUS = 3; private File path; public FileSearchRecursiveTask(File file) { this.path = file; } @Override protected List<String> compute() { File directory = path; if(directory.isDirectory() && directory.canRead()) { System.out.println(Thread.currentThread() + " - Directory is " + directory.getName()); return scan(directory); } return Collections.emptyList(); } private List<String> scan(File directory) { File[] fileList = directory.listFiles(); if(fileList == null || fileList.length == 0) return Collections.emptyList(); List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>(); List<String> filteredFileList = new ArrayList<>(); for(File file: fileList) { System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath()); if(file.isDirectory()) { if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) { FileSearchRecursiveTask task = new FileSearchRecursiveTask(file); recursiveTasks.add(task); task.fork(); } else filteredFileList.addAll(scan(file)); } else if(file.getName().contains("hello")) { filteredFileList.add(file.getAbsolutePath()); } } for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) { FileSearchRecursiveTask task = recursiveTasks.get(ix); if(task.tryUnfork()) task.complete(scan(task.path)); } for(FileSearchRecursiveTask task: recursiveTasks) { filteredFileList.addAll(task.join()); } return filteredFileList; } }
执行处理的方法已经被分解为接收目录作为参数的方法,因此我们能够在本地使用它来获取不一定与FileSearchRecursiveTask实例相关联的任意目录.
然后,该方法使用getSurplusQueuedTaskCount()
来确定其他工作线程未拾取的本地排队任务的数量.确保有一些帮助平衡工作.但是,如果此数字超过阈值,则处理将在本地完成,而不需要更多的工作.
在本地处理之后,它迭代任务并使用tryUnfork()
来识别未被其他工作线程窃取的作业并在本地处理它们.对最年轻的工作进行反复尝试可以提高找到一些工作的机会.
只有在此之后,它才会加入()s所有子作业,这些子作业现在由另一个工作线程完成或者当前处理.