您好,欢迎来到九壹网。
搜索
您的当前位置:首页Netty的Timer管理

Netty的Timer管理

来源:九壹网


Netty的Timer管理–开源的魅力

这里的Timer,是指定时器,现代操作系统,定时器无处不在,以至于有些将linux kernel的书,都需要单独列出一章,来将linux是如何管理这些定时器的。管理定时器其实主要的步骤有以下3步:

1.生成定时器(这里面最终要的是给予定时器其到期时间以及到期后要产生的动作)

2.将定时器放入一个数据结构中,以便系统能够定时的扫描检查这些定时器是否到期,是否需要触发

3.定期检查定时器是否到期

一般能够想到的最直观的实现定时器的办法,就是在定时器里插入一个绝对时间,然后将定时器放入一个链表,每次轮询时,将该链表整体轮询一遍,从而检查有哪些定时器到期了。那么这样的时间复杂度为:插入定时器 o(1); 每次轮询o(n).

现在的linux内核中,对timer的管理其实是根据timer离当前时间的远近,将其存放到不同的双向队列数组中,每个队列数组长度一样,比如都是2的8次方;(具体是2的几次方,这个还要依赖其他条件,这里只是举个例子)间隔时间小于2的8次方的,存放在第一个双向队列数组里;间隔时间大于等于2的8次方但小于2的16次方的,则存放在第二个双向队列数组里,以此类推;

然后每个2的8次方个周期,对上面的双向队列数组进行重新安排,比如这个时候,需要将第二个数组里的有些队列移动到地一个数组里。这样在实现时,插入定时器o(1),其只需要计算该timer的时间和当前时间的关系,从而计算出应该将其放置第几个数组的第几个双向队列上(同一个数组的某个双向对列上,存放的都是时间一样的timer)即可(基本上是最简单而直接的hash算法);插入定时器o(1),平时轮询时,只要取最近的那个数组里的对应timer即可,时间复杂度为

o(1),每隔指定的周期后(比如2的8次方个周期后)论询一次,这个时候因为要对所有的现存的timer进行调整,最坏时间复杂度是o(n).

而Netty中的Timer管理,则是使用了所谓的Hashed Wheel模式,在数据结构上,其有一个循环数组,这个循环数组有n个bucket(假设为8),可以用来存放Timer链表;Hash Wheel里有2个周期,一个周期是单次轮询的周期,假设为100个Tick;第二个周期是轮询完数组的周期,那么其大小为n*Tick(8*100就为800),假设叫round 周期;那么在每次插入一个timer时,会根据timer的触发时间,计算出其应该存放在哪个bucket的链表中(比如timer距离当前周期为2550tick,那么该Timer就会放在(2550%(n*100Tick))/100Tick=2,这里取celling),在timer插入到bucket中的同时,按照这种算法,相隔round周期整数倍的Timer会存放在相同的bucket的链表中,为了区分round周期,所以每个timer在存放时,还会有一个数值用来村放其距离当前时间还剩多少个round周期(刚才的例子round为3)。当timer插入后,这样轮询的时候就比较方便,轮询时,会有一个cursor,每隔一个周期就加一,论询时该cursor指向数组的哪个bucket,就检查该bucket中的所有timer,主要是检查该timer的round周期是否为0,如果不是0,则将其值减1,然后将其归还(cursor不断增加,当cursor下次指向该bucket时,经过的时间为一个round周期)。而如果round周期已经为0了,则说明该timer需要被触发了,从而触发该timer。这样计算时间复杂度的话,插入的时候,其时间复杂度为o(1),而在轮询的时候,其时间复杂度也可理解为o(1).

Hashed Wheel Timing原理图

比较一下linux内核里的timer管理和netty的timer管理,会发现,如果管理的timer的触发时间都距离当前时间比较近,那么linux和netty的效率应该都差不多(linux这个时候,当间隔的大周期发生的时候,也基本不用调整数组,从而不会发生时间复杂度为o(n)的操作)。而如果管理了较多的距离当前时间很长的timer(此时的linux,会在除了第一个数组外的其他数组里也要存放timer;而netty中,则会在每个bucket的链表中,存放较多的round周期大于0的timer),那么这个时候,linux平时轮询时,;处理的都是确实需要被触发的timer,而netty,很可能会碰到很多不要触发的timer,然后把这些timer的round周期减一,这种情况下,对于平时的轮询,linux是要优于netty;而当碰到大周期时,linux的耗时会超过netty。相当与在这种情形下,netty是把对长时间timer的处理分散在每次轮询中,而linux则是把它集中在一个周期里来做。linux的这种做法,在一般的情景下应该是ok的,但是在realtime os的情况下,应该是有问题的。

相比较与公司自己实现的一个二方库,里面其实也需要用到超时机制,但是实现起来就是很直观,这也没有办法,毕竟大家都时间有限,有的时候还是以快速实现业务为主,没有太多时间把一些技术点考虑的很深远,但是这个时候我们还是可以借鉴一下开源软件中的类似解决方案,拿来为己所用。开源软件能够汇集互联网上很多人的智慧,这可能是很多公司的技术力量都不能够比拟的,这可能也就是开源的魅力所在。

Netty使用初步

1、简介

Java1.4提供了NIO使开发者可以使用Java编写高性能的服务端程序,但使用原生的NIO API就像Linux C中网络编程一样,还是需要做IO处理、协议处理等低层次工作。所以,就像C服务端程序大量使用libevent作为网络应用框架一样,Java社区也不断涌现出基于NIO的网络应用框

架。在这其中,Jboss出品的Netty就是个中翘楚。Netty是个异步的事件驱动网络应用框架,具有高性能、高扩展性等特性。Netty提供了统一的底层协议接口,使得开发者从底层的网络协议(比如TCP/IP、UDP)中解脱出来。就使用来说,开发者只要参考 Netty提供的若干例子和它的指南文档,就可以放手开发基于Netty的服务端程序了。

在Java社区,最知名的开源Java NIO框架要属Mina和Netty,而且两者渊源颇多,对两者的比较自然不少。实际上,Netty的作者原来就是Mina作者之一,所以可以想到,Netty和Mina在设计理念上会有很多共同点。我对Mina没什么研究,但其作者介绍,Netty的设计对开发者有更友好的扩展性,并且性能方面要优于Mina,而Netty完善的文档也很吸引人。所以,如果你在寻找Java NIO框架,Netty是个很不错的选择。本文的内容就是围绕一个demo介绍使用Netty的点点滴滴。

2、服务端程序

2.1、ChannelHandler

服务端程序通常的处理过程是:解码请求数据、业务逻辑处理、编码响应。从框架角度来说,可以提供3个接口来控制并调度该处理过程;从更通用的角度来说,并不特化处理其中的每一步,而把每一步当做过滤器链中的一环,这也是Netty的做法。Netty对请求处理过程实现了过滤器链模式(ChannelPipeline),每个过滤器实现了ChannelHandler接口。Netty中有两种请求事件流类型也做了细分:

1)downstream event:其对应的ChannelHandler子接口是ChannelDownstreamHandler。downstream event是说从头到尾执行ChannelPipeline中的ChannelDownstreamHandler,这一过程相当于向外发送数据的过程。 downstream event有:”write”、”bind”、”unbind”、 “connect”、 “disconnect”、”close”。

2)upstream event:其对应的ChannelHandler子接口是ChannelUpstreamHandler。upstream event处理的事件方向和downstream event相反,这一过程相当于接收处理外来请求的

upstream 、

event

:、

”messageReceived””channelClosed”

、 、 、

“exceptionCaught”“channelBound”

”channelOpen”、

”channelUnbound”

“channelConnected”、”writeComplete”、”channelDisconnected”、”channelInterestChanged”。

Netty中有个注释@interface ChannelPipelineCoverage,它表示被注释的ChannelHandler是否能添加到多个ChannelPipeline中,其可选的值是”all”和”one”。”all”表示ChannelHandler是无状态的,可被多个ChannelPipeline共享,而”one”表示ChannelHandler只作用于单个ChannelPipeline中。但ChannelPipelineCoverage只是个注释而已,并没有实际的检查作用。对于ChannelHandler是”all”还是”one”,还是根据逻辑需要而定。比如,像解码请求handler,因为可能解码的数据不完整,需要等待下一次读事件来了之后再继续解析,所以解码请求handler就需要是”one”的(否则多个Channel共享数据就乱了)。而像业务逻辑处理hanlder通常是”all”的。

下面以一个简单的例子说明如何编写“解码请求数据、业务逻辑处理、编码响应”这一过程中涉及的ChannelHandler。该例子实现的协议格式很简单,请求和响应流中头4个字节表示后面跟的内容长度,根据该长度可得到内容体。

首先看下解码器的实现:

public class MessageDecoder extends FrameDecoder {

@Override

protected Object decode(

ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {

if (buffer.readableBytes() < 4) {

return null;//(1)

}

int dataLength = buffer.getInt(buffer.readerIndex());

if (buffer.readableBytes() < dataLength + 4) {

return null;//(2)

}

buffer.skipBytes(4);//(3)

byte[] decoded = new byte[dataLength];

buffer.readBytes(decoded);

String msg = new String(decoded);//(4)

return msg;

}

}

MessageDecoder继承自FrameDecoder,FrameDecoder是Netty codec包中的辅助类,它是个ChannelUpstreamHandler,decode方法是FrameDecoder子类需要实现的。在上面的代码中,有:

(1)检查ChannelBuffer中的字节数,如果ChannelBuffer可读的字节数少于4,则返回null等待下次读事件。

(2)继续检查ChannelBuffer中的字节数,如果ChannelBuffer可读的字节数少于dataLength + 4,则返回null等待下次读事件。

(3)越过dataLength的字节。

(4)构造解码的字符串返回。

@ChannelPipelineCoverage(\"all\")

public class MessageServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(

MessageServerHandler.class.getName());

@Override

public void messageReceived(

ChannelHandlerContext ctx, MessageEvent e) {

if (!(e.getMessage() instanceof String)) {

return;//(1)

}

String msg = (String) e.getMessage();

System.err.println(\"got msg:\"+msg);

e.getChannel().write(msg);//(2)

}

@Override

public void exceptionCaught(

ChannelHandlerContext ctx, ExceptionEvent e) {

logger.log(

Level.WARNING,

\"Unexpected exception from downstream.\",

e.getCause());

e.getChannel().close();

}

}

MessageServerHandler是服务端业务处理handler,其继承自

SimpleChannelUpstreamHandler,并主要实现messageReceived事件。关于该类,有如下注解:

(1)该upstream事件流中,首先经过MessageDecoder,其会将decode返回的解码后的数据构造成 MessageEvent.getMessage(),所以在

handler

上下文关系中,

MessageEvent.getMessage()并不一定都返回ChannelBuffer类型的数据。

(2)MessageServerHandler只是简单的将得到的msg再写回给客户端。e.getChannel().write(msg);操作将触发DownstreamMessageEvent事件,也就是调用下面的MessageEncoder将编码的数据返回给客户端。

@ChannelPipelineCoverage(\"all\")

public class MessageEncoder extends OneToOneEncoder {

@Override

protected Object encode(

ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {

if (!(msg instanceof String)) {

return msg;//(1)

}

String res = (String)msg;

byte[] data = res.getBytes();

int dataLength = data.length;

ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)

buf.writeInt(dataLength);

buf.writeBytes(data);

return buf;//(3)

}

}

MessageEncoder是个ChannelDownstreamHandler。对该类的注解如下:

(1)如果编码的msg不是合法类型,就直接返回该msg,之后OneToOneEncoder会调用 ctx.sendDownstream(evt);来调用下一个ChannelDownstreamHandler。对于该例子来说,这种情况是不应该出现的。

(2)开发者创建ChannelBuffer的用武之地就是这儿了,通常使用dynamicBuffer即可,表示得到的ChannelBuffer可动态增加大小。

(3)返回编码后的ChannelBuffer之后,OneToOneEncoder会调用Channels.write将数据写回客户端。

2.2、MessageServerPipelineFactory

创建了3个ChannelHandler,需要将他们注册到ChannelPipeline,而ChannelPipeline又是和Channel对应的(是全局单例还是每个Channel对应一个ChannelPipeline实例依赖于实现)。可以实现ChannelPipeline的工厂接口 ChannelPipelineFactory实现该目的。MessageServerPipelineFactory的代码如下:

public class MessageServerPipelineFactory implements

ChannelPipelineFactory {

public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = pipeline();

pipeline.addLast(\"decoder\", new MessageDecoder());

pipeline.addLast(\"encoder\", new MessageEncoder());

pipeline.addLast(\"handler\", new MessageServerHandler());

return pipeline;

}

}

2.3、MessageServer

服务端程序就剩下启动代码了,使用Netty的ServerBootstrap三下五除二完成之。

public class MessageServer {

public static void main(String[] args) throws Exception {

// Configure the server.

ServerBootstrap bootstrap = new ServerBootstrap(

new NioServerSocketChannelFactory(

Executors.newCachedThreadPool(),

Executors.newCachedThreadPool()));

// Set up the default event pipeline.

bootstrap.setPipelineFactory(new MessageServerPipelineFactory());

// Bind and start to accept incoming connections.

bootstrap.bind(new InetSocketAddress(8080));

}

}

稍加补充的是,该Server程序并不完整,它没有处理关闭时的资源释放,尽管暴力的来看并不一定需要做这样的善后工作。

3、客户端程序

客户端程序和服务端程序处理模型上是很相似的,这里还是付上代码并作简要说明。

3.1、 ChannelHandler

客户端是先发送数据到服务端(downstream事件流),然后是处理从服务端接收的数据(upstream事件流)。这里有个问题是,怎么把需要发送的数据送到downstream事件流里呢?

这就用到了ChannelUpstreamHandler的channelConnected事件了。实现的 MessageClientHandler代码如下:

@ChannelPipelineCoverage(\"all\")

public class MessageClientHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(

MessageClientHandler.class.getName());

@Override

public void channelConnected(

ChannelHandlerContext ctx, ChannelStateEvent e) {

String message = \"hello kafka0102\";

e.getChannel().write(message);

}

@Override

public void messageReceived(

ChannelHandlerContext ctx, MessageEvent e) {

// Send back the received message to the remote peer.

System.err.println(\"messageReceived send message \"+e.getMessage());

try {

Thread.sleep(1000*3);

} catch (Exception ex) { ex.printStackTrace();

}

e.getChannel().write(e.getMessage());

}

@Override

public void exceptionCaught(

ChannelHandlerContext ctx, ExceptionEvent e) {

// Close the connection when an exception is raised.

logger.log(

Level.WARNING,\"Unexpected exception from downstream.\", e.getCause());

e.getChannel().close();

}

}

对于编码和解码Handler,复用MessageEncoder和MessageDecoder即可。

3.2、 MessageClientPipelineFactory

public class MessageClientPipelineFactory implements

ChannelPipelineFactory {

public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = pipeline();

pipeline.addLast(\"decoder\", new MessageDecoder());

pipeline.addLast(\"encoder\", new MessageEncoder());

pipeline.addLast(\"handler\", new MessageClientHandler());

return pipeline;

}

}

3.3、MessageClient

public class MessageClient {

public static void main(String[] args) throws Exception {

// Parse options.

String host = \"127.0.0.1\";

int port = 8080;

// Configure the client.

ClientBootstrap bootstrap = new ClientBootstrap(

new NioClientSocketChannelFactory(

Executors.newCachedThreadPool(),

Executors.newCachedThreadPool()));

// Set up the event pipeline factory.

bootstrap.setPipelineFactory(new MessageClientPipelineFactory());

// Start the connection attempt.

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

// Wait until the connection is closed or the connection attempt fails.

future.getChannel().getCloseFuture().awaitUninterruptibly();

// Shut down thread pools to exit.

bootstrap.releaseExternalResources();

}

}

在写客户端例子时,我想像的代码并不是这样的,对客户端的代码我也没做过多的研究,所以也可能没有找到更好的解决方案。在上面的例子中,bootstrap.connect方法中会触发实际的连接操作,接着触发 MessageClientHandler.channelConnected,使整个过程运转起来。但是,我想要的是一个连接池,并且如何写数据也不应该在channelConnected中,这样对于动态的数据,只能在构造函数中传递需要写的数据了。但到现在,我还不清楚如何将连接池和 ChannelPipeline有效的结合起来。或许,这样的需求可以跨过Netty来实现。

4、总结

关于Netty的初步使用,尚且总结到这里。关于这篇文章,写得断断续续,以至于到后来我都没兴趣把内容都整理出来。当然,这多少也是因为我是先整理 Netty原理方面的东西所致。我也只能卑微的期望,该文对Netty入门者会有些许帮助。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- 91gzw.com 版权所有 湘ICP备2023023988号-2

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务