多线程 – 获取任务节点上执行程序的核心数的方法?

前端之家收集整理的这篇文章主要介绍了多线程 – 获取任务节点上执行程序的核心数的方法?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
例如.我需要获得所有可用执行程序及其各自的多线程容量的列表(不是总多线程容量,sc.defaultParallelism已经处理过).

由于此参数与实现有关(YARN和spark-standalone具有不同的分配核心策略)和情境(由于动态分配和长期作业运行,它可能会波动).我不能用其他方法估计这个.有没有办法在分布式转换中使用Spark API检索此信息? (例如,TaskContext,SparkEnv)

更新至于Spark 1.6,我尝试了以下方法

1)运行具有许多分区的一阶段作业(>> defaultParallelism)并计算每个executorID的独特threadID数:

val n = sc.defaultParallelism * 16
sc.parallelize(n,n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()

然而,这导致估计高于实际多线程容量,因为每个Spark执行器使用过度配置的线程池.

2)类似于1,除了n = defaultParallesim,并且在每个任务中我添加一个延迟以防止资源协商器进行不平衡分片(快速节点完成它的任务并在慢节点开始运行之前请求更多):

val n = sc.defaultParallelism
sc.parallelize(n,n).map{
  v =>
    Thread.sleep(5000)
    SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()

它大部分时间都可以工作,但速度远远超过必要的速度,可能会被非常不平衡的集群或任务推测所打破.

3)我没试过这个:用java反射来读取BlockManager.numUsableCores,这显然不是一个稳定的解决方案,内部实现可能随时改变.

请告诉我你是否找到了更好的东西.

解决方法

使用Spark rest API非常容易.你必须获得应用程序ID:
val applicationId = spark.sparkContext.applicationId

ui网址:

val baseUrl = spark.sparkContext.uiWebUrl

查询

val url = baseUrl.map { url => 
  s"${url}/api/v1/applications/${applicationId}/executors"
}

使用Apache HTTP库(已经在Spark依赖项中,从https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients改编):

import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.client.methods.HttpGet
import scala.util.Try

val client = new DefaultHttpClient()

val response = url
  .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption)
  .flatMap(response => Try{
    val s = response.getEntity().getContent()
    val json = scala.io.Source.fromInputStream(s).getLines.mkString
    s.close
    json
  }.toOption)

和json4s:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

case class ExecutorInfo(hostPort: String,totalCores: Int)

val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try {
  parse(json).extract[List[ExecutorInfo]]
}.toOption)

只要您保留应用程序ID和ui URL并打开ui端口到外部连接,您就可以从任何任务执行相同的操作.

猜你在找的Java相关文章