Worker API 并行化
Worker API 允许任务将工作分解为多个独立单元并行执行,充分利用多核CPU,显著提升构建性能。
Worker API 概念
为什么需要 Worker API
传统任务:
kotlin
@TaskAction fun process() { files.forEach { file -> // 串行处理每个文件 processFile(file) } }问题:
- 串行执行
- 无法利用多核
- 处理大量文件慢
Worker API:
kotlin
@TaskAction fun process() { files.forEach { file -> // 并行处理 workerExecutor.noIsolation().submit(ProcessWorkAction::class) { inputFile.set(file) } } }优势:
- 并行执行
- 充分利用CPU
- 提升性能
基本用法
定义 WorkAction
kotlin
abstract class ProcessWorkAction : WorkAction<ProcessWorkAction.Parameters> { interface Parameters : WorkParameters { val inputFile: RegularFileProperty val outputFile: RegularFileProperty } override fun execute() { val input = parameters.inputFile.get().asFile val output = parameters.outputFile.get().asFile // 处理文件 val content = input.readText() output.writeText(content.uppercase()) } }提交工作
kotlin
abstract class ParallelProcessTask : DefaultTask() { @get:Inject abstract val workerExecutor: WorkerExecutor @get:InputFiles abstract val inputFiles: ConfigurableFileCollection @get:OutputDirectory abstract val outputDir: DirectoryProperty @TaskAction fun process() { inputFiles.forEach { file -> workerExecutor.noIsolation().submit(ProcessWorkAction::class) { inputFile.set(file) outputFile.set(outputDir.file(file.name)) } } } }工作单元隔离
noIsolation 无隔离
kotlin
workerExecutor.noIsolation().submit(MyWorkAction::class) { // 参数配置 }特点:
- 共享类加载器
- 性能最好
- 适合大多数场景
classLoaderIsolation 类加载器隔离
kotlin
workerExecutor.classLoaderIsolation { classpath.from(configurations.named("myClasspath")) }.submit(MyWorkAction::class) { // 参数配置 }特点:
- 独立类加载器
- 避免类冲突
- 性能适中
processIsolation 进程隔离
kotlin
workerExecutor.processIsolation { forkOptions { maxHeapSize = "512m" systemProperty("file.encoding", "UTF-8") } }.submit(MyWorkAction::class) { // 参数配置 }特点:
- 独立进程
- 完全隔离
- 性能最慢
- 适合不稳定的代码
参数传递
WorkParameters
kotlin
interface MyParameters : WorkParameters { val message: Property<String> val count: Property<Int> val inputFile: RegularFileProperty } abstract class MyWorkAction : WorkAction<MyParameters> { override fun execute() { println(parameters.message.get()) println(parameters.count.get()) println(parameters.inputFile.get().asFile.name) } }传递参数
kotlin
workerExecutor.noIsolation().submit(MyWorkAction::class) { message.set("Hello") count.set(42) inputFile.set(file("input.txt")) }等待完成
await
kotlin
@TaskAction fun process() { inputFiles.forEach { file -> workerExecutor.noIsolation().submit(ProcessWorkAction::class) { inputFile.set(file) } } // 等待所有工作完成 workerExecutor.await() println("All work completed") }实战案例
案例1:图片压缩
kotlin
interface ImageParameters : WorkParameters { val inputImage: RegularFileProperty val outputImage: RegularFileProperty val quality: Property<Int> } abstract class CompressImageAction : WorkAction<ImageParameters> { override fun execute() { val input = parameters.inputImage.get().asFile val output = parameters.outputImage.get().asFile val quality = parameters.quality.get() // 压缩图片 ImageIO.read(input).let { image -> val writer = ImageIO.getImageWritersByFormatName("jpg").next() val param = writer.defaultWriteParam param.compressionMode = ImageWriteParam.MODE_EXPLICIT param.compressionQuality = quality / 100f output.outputStream().use { stream -> writer.output = ImageIO.createImageOutputStream(stream) writer.write(null, IIOImage(image, null, null), param) } } } } abstract class CompressImagesTask : DefaultTask() { @get:Inject abstract val workerExecutor: WorkerExecutor @get:InputFiles abstract val images: ConfigurableFileCollection @get:OutputDirectory abstract val outputDir: DirectoryProperty @get:Input abstract val quality: Property<Int> @TaskAction fun compress() { images.forEach { image -> workerExecutor.noIsolation().submit(CompressImageAction::class) { inputImage.set(image) outputImage.set(outputDir.file(image.name)) quality.set(this@CompressImagesTask.quality) } } } } tasks.register<CompressImagesTask>("compressImages") { images.from("src/main/res/drawable-xxhdpi") outputDir.set(layout.buildDirectory.dir("compressed")) quality.set(85) }案例2:代码格式化
kotlin
interface FormatParameters : WorkParameters { val sourceFile: RegularFileProperty } abstract class FormatCodeAction : WorkAction<FormatParameters> { override fun execute() { val file = parameters.sourceFile.get().asFile val code = file.readText() // 格式化代码(示例) val formatted = code .replace(" ", " ") // 2空格转4空格 .replace("\r\n", "\n") // 统一换行符 file.writeText(formatted) } } abstract class FormatCodeTask : DefaultTask() { @get:Inject abstract val workerExecutor: WorkerExecutor @get:InputFiles abstract val sources: ConfigurableFileCollection @TaskAction fun format() { sources.forEach { source -> workerExecutor.noIsolation().submit(FormatCodeAction::class) { sourceFile.set(source) } } workerExecutor.await() println("Formatted ${sources.files.size} files") } }案例3:并行测试
kotlin
interface TestParameters : WorkParameters { val testClass: Property<String> val reportFile: RegularFileProperty } abstract class RunTestAction : WorkAction<TestParameters> { override fun execute() { val className = parameters.testClass.get() val report = parameters.reportFile.get().asFile // 运行测试 val result = runTest(className) // 写入报告 report.writeText("Test: $className, Result: $result") } private fun runTest(className: String): String { // 实际测试逻辑 return "PASSED" } } abstract class ParallelTestTask : DefaultTask() { @get:Inject abstract val workerExecutor: WorkerExecutor @get:Input abstract val testClasses: ListProperty<String> @get:OutputDirectory abstract val reportsDir: DirectoryProperty @TaskAction fun runTests() { testClasses.get().forEach { testClass -> workerExecutor.processIsolation().submit(RunTestAction::class) { this.testClass.set(testClass) reportFile.set(reportsDir.file("$testClass.txt")) } } } }性能优化
控制并发数
kotlin
@TaskAction fun process() { val semaphore = Semaphore(Runtime.getRuntime().availableProcessors()) files.forEach { file -> semaphore.acquire() workerExecutor.noIsolation().submit(ProcessWorkAction::class) { inputFile.set(file) } } }批处理
kotlin
@TaskAction fun process() { val batches = files.chunked(10) // 每批10个文件 batches.forEach { batch -> workerExecutor.noIsolation().submit(BatchProcessAction::class) { inputFiles.set(batch) } } }最佳实践
选择适当的隔离级别:
- 默认使用
noIsolation - 类冲突时使用
classLoaderIsolation - 不稳定代码使用
processIsolation
合理分解工作:
- 每个工作单元独立
- 避免共享可变状态
- 工作量适中
使用 WorkParameters:
- 不要使用构造函数传参
- 使用 Property API
- 支持序列化
等待完成:
kotlin
workerExecutor.await() // 确保所有工作完成异常处理:
kotlin
override fun execute() { try { // 工作逻辑 } catch (e: Exception) { logger.error("Work failed", e) throw e } }性能监控:
kotlin
val start = System.currentTimeMillis() workerExecutor.await() val duration = System.currentTimeMillis() - start println("Completed in ${duration}ms")