中移短信cmpp协议netty实现编解码
这是一个在netty4框架下实现的cmpp3.0/cmpp2.0短信协议解析及网关端口管理 (master分支是依赖于netty5的)。 代码copy了 huzorro@gmail.com 基于netty3.7的cmpp协议解析 huzorro@gmail.com 的代码
目前已支持发送和解析长文本短们拆分合并,WapPush短信,以及彩信通知类型的短信。可以实现对彩信或者wap-push短信的拦截和加工处理。wap短信的解析使用 [smsj] (https://github.com/marre/smsj)的短信库
代码已经跟华为,东软,亚信的短信网关都做过联调测试,兼容了不同厂家的错误和异常,如果跟网关通信出错,可以打开trace日志查看二进制数据。
##性能测试 在48core,128G内存的物理服务器上测试协议解析效率:35K条/s, cpu使用率25%.
执行mvn package . jdk1.6以上. (如果用netty5则必须使用jdk1.7)
业务层实现接口:BusinessHandlerInterface,或者继承AbstractBusinessHandler抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。
com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个Tcp连接的发起端,或者接收端。用来记录连接的IP.port,以及CMPP协议的用户名,密码,业务处理的ChannelHandler集合等其它端口参数。包含三个字类:
-
com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个List属性。 一个服务端口包含多个CMPPServerChildEndpointEntity端口
-
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含CMPP连接用户名,密码,以及协议版本等信息
-
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含CMPP连接用户名,密码,以及协议版本,以及服务端IP.port. 用于连接服务端
com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个EndpointConnector.当CMPP连接建立完成,将连接加入连接器管理,并给pipeLine上挂载业务处理的ChannelHandler.
-
com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的open()调用netty的ServerBootstrap.bind()开一个服务监听
-
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集CMPPServerChildEndpointEntity端口下的所有连接。它的open()方法为空.
-
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类open()调用netty的Bootstrap.connect()开始一个TCP连接
com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。
com.zx.sms.session.cmpp.SessionLoginManager 这是一个netty的ChannelHandler实现,主要负责CMPP连接的建立。当CMPP连接建立完成后,会调用EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给channel的pipeline上挂载业务处理的Handler,最后触发 SessionState.Connect事件,通知业务处理Handler连接已建立成功。
com.zx.sms.session.cmpp.SessionStateManager 这是一个netty的ChannelHandler实现。负责每个连接上CMPP消息的存储,短信重发,流量窗口控制,过期短信的处理
CMPP20MessageCodecAggregator [2.0协议] CMPPMessageCodecAggregator [这是3.0协议] 聚合了CMPP主要消息协议的解析,编码,长短信拆分,合并处理。
使用BDB的StoreMap实现消息持久化,防止系统意外丢失短信。
- 程序启动类 new 一个CMPPEndpointEntity的实体类并设置IP,port,用户名,密码,业务处理的Handler等参数,
- 程序启动类 调用EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器
- 程序启动类 调用EndpointManager.openAll()或者EndpointManager.openEndpoint()方法打开端口。
- EndpointManager会调用EndpointEntity.buildConnector()创建一个端口连接器,并调用EndpointConnector.open()方法打开端口。
- 如果是CMPPClientEndpointEntity的话,就会向服务器发起TCP连接请求,如果是CMPPServerEndpointEntity则会在本机开启一个服务端口等客户端连接。
- TCP连接建立完成后。netty会调用EndpointConnector.initPipeLine()方法初始化PipeLine,把CMPP协议解析器,SessionLoginManager加到PipeLine里去,然后netty触发ChannelActive事件。
- 在SessionLoginManager类里,客户端收到ChannelActive事件后会发送一个CMPPConnnect消息,请求建立CMPP连接.
- 同样在SessionLoginManager.channelRead()方法里,服务端会收到CMPPConnnect消息,开始对用户名,密码进行鉴权,并给客户端鉴权结果。
- 鉴权通过后,SessionLoginManager调用EndpointConnector.addChannel(channel)方法,把channel加入ArrayList,并给pipeLine上挂载SessionStateManager和业务处理的ChannelHandler。
- EndpointConnector.addChannel(channel)完成后,SessionLoginManager调用ctx.fireUserEventTriggered()方法,触发 SessionState.Connect事件。
以上CMPP连接建立完成。
- 业务处理类收到SessionState.Connect事件,开始业务处理,如下发短信。
- SessionStateManager会拦截所有read()和write()的消息,进行消息持久化,消息重发,流量控制。
public class TestCMPPEndPoint { private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class); @Test public void testCMPPEndpoint() throws Exception { final CMPPEndpointManager manager = CMPPEndpointManager.INS; // 注意下面所有Entity的Id字段是不允许重复的,Id标识整个JVM唯一的一个网关端口 // 创建一个CMPP的服务端,模拟一个短信网关 CMPPServerEndpointEntity server = new CMPPServerEndpointEntity(); server.setId("server"); server.setHost("127.0.0.1"); server.setPort(7891); server.setValid(true); server.setUseSSL(false); //不使用SSL加密流量 //给这个网关增加一个允许接入的账号 CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity(); child.setId("child"); child.setChartset(Charset.forName("utf-8")); child.setGroupName("test"); child.setUserName("901782"); child.setPassword("ICP"); child.setValid(true); child.setWindows((short)16); child.setVersion((short)48); child.setMaxChannels((short)20); child.setRetryWaitTimeSec((short)100); child.setMaxRetryCnt((short)3); //给这个账号添加一个业务处理: SessionConnectedHandler 。当client连接完成,用户名密码正确后,立即给Client发送200000条短信 List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>(); serverhandlers.add(new SessionConnectedHandler()); child.setBusinessHandlerSet(serverhandlers); server.addchild(child); //把Server加入到管理器 manager.addEndpointEntity(server); //模拟创建一个Client,要去连接上面的网关,使用上面账号密码 CMPPClientEndpointEntity client = new CMPPClientEndpointEntity(); client.setId("client"); client.setHost("127.0.0.2,127.0.0.3,127.0.0.1"); client.setPort(7891); client.setChartset(Charset.forName("utf-8")); client.setGroupName("test"); client.setUserName("901782"); client.setPassword("ICP"); client.setReadLimit(0); child.setWriteLimit(100); client.setWindows((short)16); client.setVersion((short)48); client.setRetryWaitTimeSec((short)100); client.setUseSSL(false); //不使用SSL加密流量 //这里是给Client增加一个处理业务:MessageReceiveHandler ,统计接收到的短信条数和速度,并打印到控制台。 List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>(); clienthandlers.add(new MessageReceiveHandler()); client.setBusinessHandlerSet(clienthandlers); //把Client加入到管理器 manager.addEndpointEntity(client); //管理器打开端口 manager.openAll(); //主线程挂起,在控制台看实时打印的Log Thread.sleep(300000); //关闭所有连接和端口 CMPPEndpointManager.INS.close(); } }