例如.我需要获得所有可用执行程序及其各自的多线程容量的列表(不是总多线程容量,sc.defaultParallelism已经处理过).
由于此参数与实现有关(YARN和spark-standalone具有不同的分配核心策略)和情境(由于动态分配和长期作业运行,它可能会波动).我不能用其他方法估计这个.有没有办法在分布式转换中使用Spark API检索此信息? (例如,TaskContext,SparkEnv)
更新至于Spark 1.6,我尝试了以下方法:
1)运行具有许多分区的一阶段作业(>> defaultParallelism)并计算每个executorID的独特threadID数:
val n = sc.defaultParallelism * 16 sc.parallelize(n,n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID) .groupByKey() .mapValue(_.distinct) .collect()
然而,这导致估计高于实际多线程容量,因为每个Spark执行器使用过度配置的线程池.
2)类似于1,除了n = defaultParallesim,并且在每个任务中我添加一个延迟以防止资源协商器进行不平衡分片(快速节点完成它的任务并在慢节点开始运行之前请求更多):
val n = sc.defaultParallelism sc.parallelize(n,n).map{ v => Thread.sleep(5000) SparkEnv.get.executorID -> Thread.currentThread().getID } .groupByKey() .mapValue(_.distinct) .collect()
它大部分时间都可以工作,但速度远远超过必要的速度,可能会被非常不平衡的集群或任务推测所打破.
3)我没试过这个:用java反射来读取BlockManager.numUsableCores,这显然不是一个稳定的解决方案,内部实现可能随时改变.
请告诉我你是否找到了更好的东西.
解决方法
使用Spark rest API非常容易.你必须获得应用程序ID:
val applicationId = spark.sparkContext.applicationId
ui网址:
val baseUrl = spark.sparkContext.uiWebUrl
和查询:
val url = baseUrl.map { url => s"${url}/api/v1/applications/${applicationId}/executors" }
使用Apache HTTP库(已经在Spark依赖项中,从https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients改编):
import org.apache.http.impl.client.DefaultHttpClient import org.apache.http.client.methods.HttpGet import scala.util.Try val client = new DefaultHttpClient() val response = url .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption) .flatMap(response => Try{ val s = response.getEntity().getContent() val json = scala.io.Source.fromInputStream(s).getLines.mkString s.close json }.toOption)
和json4s:
import org.json4s._ import org.json4s.jackson.JsonMethods._ implicit val formats = DefaultFormats case class ExecutorInfo(hostPort: String,totalCores: Int) val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try { parse(json).extract[List[ExecutorInfo]] }.toOption)
只要您保留应用程序ID和ui URL并打开ui端口到外部连接,您就可以从任何任务执行相同的操作.