Skip to content

Worker API 并行化

源:Gradle 官方文档 - Worker API

Worker API 允许任务将工作分解为多个独立单元并行执行,充分利用多核CPU,显著提升构建性能。

Worker API 概念

为什么需要 Worker API

传统任务

kotlin
@TaskAction fun process() {  files.forEach { file ->  // 串行处理每个文件  processFile(file)  } }

问题

  • 串行执行
  • 无法利用多核
  • 处理大量文件慢

Worker API

kotlin
@TaskAction fun process() {  files.forEach { file ->  // 并行处理  workerExecutor.noIsolation().submit(ProcessWorkAction::class) {  inputFile.set(file)  }  } }

优势

  • 并行执行
  • 充分利用CPU
  • 提升性能

基本用法

定义 WorkAction

kotlin
abstract class ProcessWorkAction : WorkAction<ProcessWorkAction.Parameters> {  interface Parameters : WorkParameters {  val inputFile: RegularFileProperty  val outputFile: RegularFileProperty  }    override fun execute() {  val input = parameters.inputFile.get().asFile  val output = parameters.outputFile.get().asFile    // 处理文件  val content = input.readText()  output.writeText(content.uppercase())  } }

提交工作

kotlin
abstract class ParallelProcessTask : DefaultTask() {  @get:Inject  abstract val workerExecutor: WorkerExecutor    @get:InputFiles  abstract val inputFiles: ConfigurableFileCollection    @get:OutputDirectory  abstract val outputDir: DirectoryProperty    @TaskAction  fun process() {  inputFiles.forEach { file ->  workerExecutor.noIsolation().submit(ProcessWorkAction::class) {  inputFile.set(file)  outputFile.set(outputDir.file(file.name))  }  }  } }

工作单元隔离

noIsolation 无隔离

kotlin
workerExecutor.noIsolation().submit(MyWorkAction::class) {  // 参数配置 }

特点

  • 共享类加载器
  • 性能最好
  • 适合大多数场景

classLoaderIsolation 类加载器隔离

kotlin
workerExecutor.classLoaderIsolation {  classpath.from(configurations.named("myClasspath")) }.submit(MyWorkAction::class) {  // 参数配置 }

特点

  • 独立类加载器
  • 避免类冲突
  • 性能适中

processIsolation 进程隔离

kotlin
workerExecutor.processIsolation {  forkOptions {  maxHeapSize = "512m"  systemProperty("file.encoding", "UTF-8")  } }.submit(MyWorkAction::class) {  // 参数配置 }

特点

  • 独立进程
  • 完全隔离
  • 性能最慢
  • 适合不稳定的代码

参数传递

WorkParameters

kotlin
interface MyParameters : WorkParameters {  val message: Property<String>  val count: Property<Int>  val inputFile: RegularFileProperty }  abstract class MyWorkAction : WorkAction<MyParameters> {  override fun execute() {  println(parameters.message.get())  println(parameters.count.get())  println(parameters.inputFile.get().asFile.name)  } }

传递参数

kotlin
workerExecutor.noIsolation().submit(MyWorkAction::class) {  message.set("Hello")  count.set(42)  inputFile.set(file("input.txt")) }

等待完成

await

kotlin
@TaskAction fun process() {  inputFiles.forEach { file ->  workerExecutor.noIsolation().submit(ProcessWorkAction::class) {  inputFile.set(file)  }  }    // 等待所有工作完成  workerExecutor.await()    println("All work completed") }

实战案例

案例1:图片压缩

kotlin
interface ImageParameters : WorkParameters {  val inputImage: RegularFileProperty  val outputImage: RegularFileProperty  val quality: Property<Int> }  abstract class CompressImageAction : WorkAction<ImageParameters> {  override fun execute() {  val input = parameters.inputImage.get().asFile  val output = parameters.outputImage.get().asFile  val quality = parameters.quality.get()    // 压缩图片  ImageIO.read(input).let { image ->  val writer = ImageIO.getImageWritersByFormatName("jpg").next()  val param = writer.defaultWriteParam  param.compressionMode = ImageWriteParam.MODE_EXPLICIT  param.compressionQuality = quality / 100f    output.outputStream().use { stream ->  writer.output = ImageIO.createImageOutputStream(stream)  writer.write(null, IIOImage(image, null, null), param)  }  }  } }  abstract class CompressImagesTask : DefaultTask() {  @get:Inject  abstract val workerExecutor: WorkerExecutor    @get:InputFiles  abstract val images: ConfigurableFileCollection    @get:OutputDirectory  abstract val outputDir: DirectoryProperty    @get:Input  abstract val quality: Property<Int>    @TaskAction  fun compress() {  images.forEach { image ->  workerExecutor.noIsolation().submit(CompressImageAction::class) {  inputImage.set(image)  outputImage.set(outputDir.file(image.name))  quality.set(this@CompressImagesTask.quality)  }  }  } }  tasks.register<CompressImagesTask>("compressImages") {  images.from("src/main/res/drawable-xxhdpi")  outputDir.set(layout.buildDirectory.dir("compressed"))  quality.set(85) }

案例2:代码格式化

kotlin
interface FormatParameters : WorkParameters {  val sourceFile: RegularFileProperty }  abstract class FormatCodeAction : WorkAction<FormatParameters> {  override fun execute() {  val file = parameters.sourceFile.get().asFile  val code = file.readText()    // 格式化代码(示例)  val formatted = code  .replace(" ", " ") // 2空格转4空格  .replace("\r\n", "\n") // 统一换行符    file.writeText(formatted)  } }  abstract class FormatCodeTask : DefaultTask() {  @get:Inject  abstract val workerExecutor: WorkerExecutor    @get:InputFiles  abstract val sources: ConfigurableFileCollection    @TaskAction  fun format() {  sources.forEach { source ->  workerExecutor.noIsolation().submit(FormatCodeAction::class) {  sourceFile.set(source)  }  }    workerExecutor.await()  println("Formatted ${sources.files.size} files")  } }

案例3:并行测试

kotlin
interface TestParameters : WorkParameters {  val testClass: Property<String>  val reportFile: RegularFileProperty }  abstract class RunTestAction : WorkAction<TestParameters> {  override fun execute() {  val className = parameters.testClass.get()  val report = parameters.reportFile.get().asFile    // 运行测试  val result = runTest(className)    // 写入报告  report.writeText("Test: $className, Result: $result")  }    private fun runTest(className: String): String {  // 实际测试逻辑  return "PASSED"  } }  abstract class ParallelTestTask : DefaultTask() {  @get:Inject  abstract val workerExecutor: WorkerExecutor    @get:Input  abstract val testClasses: ListProperty<String>    @get:OutputDirectory  abstract val reportsDir: DirectoryProperty    @TaskAction  fun runTests() {  testClasses.get().forEach { testClass ->  workerExecutor.processIsolation().submit(RunTestAction::class) {  this.testClass.set(testClass)  reportFile.set(reportsDir.file("$testClass.txt"))  }  }  } }

性能优化

控制并发数

kotlin
@TaskAction fun process() {  val semaphore = Semaphore(Runtime.getRuntime().availableProcessors())    files.forEach { file ->  semaphore.acquire()  workerExecutor.noIsolation().submit(ProcessWorkAction::class) {  inputFile.set(file)  }  } }

批处理

kotlin
@TaskAction fun process() {  val batches = files.chunked(10) // 每批10个文件    batches.forEach { batch ->  workerExecutor.noIsolation().submit(BatchProcessAction::class) {  inputFiles.set(batch)  }  } }

最佳实践

选择适当的隔离级别

  • 默认使用 noIsolation
  • 类冲突时使用 classLoaderIsolation
  • 不稳定代码使用 processIsolation

合理分解工作

  • 每个工作单元独立
  • 避免共享可变状态
  • 工作量适中

使用 WorkParameters

  • 不要使用构造函数传参
  • 使用 Property API
  • 支持序列化

等待完成

kotlin
workerExecutor.await() // 确保所有工作完成

异常处理

kotlin
override fun execute() {  try {  // 工作逻辑  } catch (e: Exception) {  logger.error("Work failed", e)  throw e  } }

性能监控

kotlin
val start = System.currentTimeMillis() workerExecutor.await() val duration = System.currentTimeMillis() - start println("Completed in ${duration}ms")