Skip to content

collapse executor 是一个高性能、低延迟的批量合并执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈

License

Notifications You must be signed in to change notification settings

icodening/collapse-executor

Repository files navigation

项目简介

collapse executor 是一个高性能、低延迟的批量执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈

适用场景

  1. 部分接口的并发量很高,想提升性能的同时又不希望引入缓存服务,减少额外维护成本。
  2. 单次调用转批量调用。在同一时刻将不同线程的参数合并为一个参数再执行批处理逻辑。
  3. 希望降低客户端的远程连接数量。
  4. 希望减少服务端工作线程消耗。
  5. 批量获取基于DB/redis的自增序列

项目亮点

  1. API简单易上手,扩展难度低

默认提供了spring-boot集成,可在spring-boot环境下开箱即用。且核心逻辑已高度抽象,二次扩展实现简单,仅需编写合并请求以及拆分响应两块逻辑。

  1. 高性能0延迟,发起批量请求时无需等待时间窗口

巧妙的利用了提交任务->任务执行两个行为之间的时间间隔进行输入的批量收集,相比于等待一个时间窗口(如等待2ms) 的设计,该设计可在批处理下依然保证极高的实时性,且在高并发的场景下也有着不俗的批量收集能力。

  1. 设计简单易维护,无需维护第三方服务

核心逻辑完全不依赖任何第三方库/服务,完全基于JDK库进行实现。

流程对比

以下两张图解释了有无折叠执行器的调用差异。当无请求折叠时,请求与网络连接数的比例为1:1;当使用请求折叠后,请求与网络连接数的比例为N:1,即多个请求会合并为一个请求发起远程调用,由此可以做到减少I/O次数、减少后端压力,从而提升调用性能降低RT。

无请求折叠

有请求折叠

快速开始

必备条件: JDK8及以上

一.入门: 自动折叠及拆分

该方式适用于简单的幂等请求的场景,通常需要用户手动指定本次调用所属的并发分组。

以下该案例表示将当前传入的Callable按照 example group 进行分组。 同一并发分组下的Callable仅执行一次,并将这一次的返回结果作为同一并发分组发起的请求结果

1.同步阻塞调用

BlockingCollapseExecutorExample.java

public class BlockingCollapseExecutorExample { public static void main(String[] args) throws Throwable { BlockingCallableGroupCollapseExecutor blockingCollapseExecutor = new BlockingCallableGroupCollapseExecutor(); String outputString = blockingCollapseExecutor.execute("example group", () -> "Hello World Collapse Executor. Blocking"); System.out.println(outputString); } }

2.异步调用

AsyncCollapseExecutorExample.java

public class AsyncCollapseExecutorExample { public static void main(String[] args) throws Throwable { AsyncCallableGroupCollapseExecutor asyncCallableGroupCollapseExecutor = new AsyncCallableGroupCollapseExecutor(); asyncCallableGroupCollapseExecutor.setExecutor(new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> { Thread thread = new Thread(r); thread.setDaemon(true); return thread; })); asyncCallableGroupCollapseExecutor.execute("example group", () -> "Hello World Collapse Executor. Async") .thenAccept(System.out::println) .thenRun(() -> System.exit(0)); System.in.read(); } }

3.非阻塞异步调用

FutureCollapseExecutorExample.java

这种方式必须保证Callable中的处理逻辑是非阻塞的!!!

public class FutureCollapseExecutorExample { public static void main(String[] args) throws Throwable { FutureCallableGroupCollapseExecutor futureCollapseExecutor = new FutureCallableGroupCollapseExecutor(); futureCollapseExecutor.execute("example group", () -> CompletableFuture.completedFuture("Hello World Collapse Executor. Future")) .thenAccept(System.out::println) .thenRun(() -> System.exit(0)); System.in.read(); } }

二. 进阶: 手动折叠及拆分

该方式适用于后端服务提供了批处理接口的场景,将同并发下其他线程的输入合并调用后端服务的批处理接口,可以减少多次不必要的单次调用,如批量查询。
由于这种方式可以更好的处理输入组,故该方式合并效率可以更高,由此带来的性能提升也会更高。

1.同步阻塞调用

CustomBlockingCollapseExecutor.java

public class CustomBlockingCollapseExecutor extends CollapseExecutorBlockingSupport<Long, UserEntity, Map<Long, UserEntity>> { @Override protected Map<Long, UserEntity> doExecute(Collection<Input<Long>> inputs) { //此处编写批量请求逻辑 return null; } @Override protected void bindingOutput(Map<Long, UserEntity> users, List<Bundle<Long, UserEntity>> bundles) { //此处编写批量响应与原始请求关联的逻辑 } }

2.异步调用

CustomAsyncCollapseExecutor.java
与同步阻塞调用类似,主要差异为需要设置一个异步线程池,用于执行批量请求逻辑。

public class CustomBlockingCollapseExecutor extends CollapseExecutorAsyncSupport<Long, UserEntity, Map<Long, UserEntity>> { @Override protected Map<Long, UserEntity> doExecute(Collection<Input<Long>> inputs) { //此处编写批量请求逻辑 return null; } @Override protected void bindingOutput(Map<Long, UserEntity> users, List<Bundle<Long, CompletableFuture<UserEntity>>> bundles) { for (Bundle<Long, CompletableFuture<UserEntity>> bundle : bundles) { Long inputId = bundle.getInput(); UserEntity userEntity = users.get(inputId); //需要返回CompletableFuture类型 bundle.bindOutput(CompletableFuture.completedFuture(userEntity)); } } }

三. Spring Boot集成

collapse executor已经对Spring Boot进行了适配,利用Spring Boot自动装配能力简化了在Spring环境下的使用体验,通过简单的配置即可使用。
在Spring Boot中,collapse executor对主要的几个组件进行了适配,分别是 RestTemplateWebClientServlet

1. application.yaml配置解释

以下是一个折叠执行器的yaml配置例子及解释,详情可参考collapse-executor-sample-spring-boot中的application.yaml

collapse: executor: enabled: true # 折叠执行器的总开关,配置为false后,后面的所有配置(servlet、rest-template、web-client)将失效 wait-threshold: 10 #批量收集的最小阈值 collecting-wait-time: 0 #声明批量收集未达到阈值时的行为。 #collecting-wait-time < 0时:不做任何等待,立即发起调用 #collecting-wait-time = 0时:让出当前收集线程时间片等待下次调度后再发起调用 #collecting-wait-time > 0时:等待指定的时间后再发起调用,单位为毫秒(ms) rest-template: enabled: true #true表示打开RestTemplate的合并拦截器 collapse-policies: #声明合并策略,可以配置多个 sample-policy1: #策略名字 collapse-request-headers: #声明需要合并的请求头名字 - authorization collapse-request-queries: #声明需要合并的查询参数名字 - sample sample-policy2: collapse-request-headers: - user-id collapse-request-queries: - sample collapse-groups: # collapse-policy-name可以省略,省略后使用默认策略仅合并path相同的请求,而忽略其他任何参数 # 例如:此时并发发起 /user/2、/user/2、/article/2、/article/2 [4]个请求,由于前两个请求满足 /user/*,则会将前两个合并为 [1] 个请求发起调用; # 而第三第四个/article/2请求没有匹配到配置中的声明的折叠组,则依然会按照 [2] 个请求分别发起调用 - uris: - /user/* - /test/noop* #------------------------------------------------------------------ # 例如:此时并发发起 /samples/1(header:authorization=test), /samples/1(header:authorization=test), /samples/1(header:authorization=demo) [3]个请求, # 由于前两个请求携带的[authorization]请求头值相同,则会将前两个合并为 [1] 个请求发起调用; # 而第三个请求则会单独发起调用,与前两个不是同一组! - collapse-policy-name: sample-policy1 #需要与前面声明的策略名对应 uris: - /samples/*

2.配置拦截器

@Configuration public class SampleConfiguration { /**  * 基于RestTemplate的使用方式  */ @Bean public RestTemplate restTemplate(CollapseHttpRequestInterceptor collapseHttpRequestInterceptor) { //注入CollapseHttpRequestInterceptor拦截器实例,并添加到RestTemplate实例上 return new RestTemplateBuilder() .interceptors(collapseHttpRequestInterceptor) .build(); } /**  * 基于WebClient的使用方式  */ @Bean public WebClient webClient(CollapseExchangeFilterFunction exchangeFilterFunction) { //注入CollapseExchangeFilterFunction过滤器实例,并添加到WebClient实例上 return WebClient.builder().filter(exchangeFilterFunction).build(); } }

3. 启动SpringBootSampleApplication查看结果

业务逻辑位于 AbstractBlockingCallSample

public abstract class AbstractBlockingCallSample { //已省略前后无关代码 @EventListener(ApplicationReadyEvent.class) public void processOnStarted() { UriComponentsBuilder baseUriBuilder = UriComponentsBuilder.fromUriString("http://localhost").port(serverPort); try { System.out.println("--------------------------------[" + prefix + "] start----------------------------------"); //查询id为1-50之间的所有用户,预期打印结果为执行了50次 queryId1between50(executorService, baseUriBuilder); //批处理查询id为1-50之间的所有用户,预期打印结果小于50次 queryId1between50Batch(); //单条查询id为1-2之间的所有用户,预期打印结果小于50次 queryId1between2(executorService, baseUriBuilder); System.out.println("--------------------------------[" + prefix + "] end------------------------------------\n\n\n"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }

性能对比

一、Servlet

结论:相比于未使用折叠,TPS提升高达94%

服务参数 server.tomcat.threads.max=200 服务地址(位于 collapse-executor-samples 中的 StressTestController, 后端均延迟[100ms]后响应,逻辑一致无差异) http://localhost:8080/test/collapse100 http://localhost:8080/test/noop100 测试参数 400用户线程数,持续压测5分钟 

开启折叠

http://localhost:8080/test/collapse100 启用请求折叠测试结果 TPS 3785/s RT99 115ms 

with-collapse

关闭折叠

http://localhost:8080/test/noop100 关闭请求折叠测试结果 TPS 1951/s RT99 211ms 

without-collapse

二、RestTemplate

结论:相比于未使用折叠,TPS提升高达80%

RestTemplate默认配置,差异仅为是否包含折叠执行拦截器(CollapseHttpRequestInterceptor) RestTemplate调用地址http://localhost:8080/test/noop0 200用户线程数,持续压测1分钟 

开启折叠

启用请求折叠测试结果 TPS 15282/s RT99 59ms 

with-collapse

关闭折叠

启用请求折叠测试结果 TPS 8455/s RT99 180ms 

without-collapse

三、WebClient

结论:相比于未使用折叠,TPS提升高达43%

WebClient默认配置,差异仅为是否包含折叠执行拦截器(CollapseExchangeFilterFunction) WebClient调用地址http://localhost:8080/test/noop0 200用户线程数,持续压测1分钟 

开启折叠

启用请求折叠测试结果 TPS 14276/s RT99 35ms 

with-collapse

关闭折叠

启用请求折叠测试结果 TPS 9971/s RT99 111ms 

without-collapse

工作流程

处理流程

About

collapse executor 是一个高性能、低延迟的批量合并执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages