Twitter的核心队列Kestrel使用Netty作为通信模块,从另一个角度证明了Netty的性能和健壮。
Netty是否比MINA强?从底层实现,两者几乎差不多,但Netty的优势是从架构上采用事件通知机制,真正的将异步模式引入来解决各种场景。响应时间可能会加长,但优势在于系统之间的依赖减弱,自身处理能力的决定因素自封闭(瓶颈可以直接根据自身业务处理资源消耗情况估计出来)
我们看看Twitter是怎么用Netty。Twitter很多项目都是用scala写的,scala是很简洁的语言,直接运行在jvm上。可以直接调用Java类。下边的代码都是来自Twitter的核心队列项目Kestrel。这个项目很有意思,可能以后还会讨论,这里先说说怎么用Netty。
NettyHandler.scala是处理Netty网络事件的基类,其他具体协议实现类,MemcacheHandler和TextHandler都继承NettyHandler。NettyHandler应用Netty的ChannelUpStreamHandler接口,这个接口处理上行请求。同时继承KestrelHandler。KestrelHandler处理Kestrel消息队列的行为,包括getItem、setItem等等。
NettyHandler主要方法是handleUpstream。处理上行请求:MessageEvent,ChannelStatEvent,等等。这些实现基本上参照Netty官网给的sample很容易实现。方法不长,才40多行,用scala写出来,有点小清新:)
def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {
event match {
case m: MessageEvent =>
// 具体实现由协议实现类MemcacheHandler等实现
handle(m.getMessage().asInstanceOf[M])
case e: ExceptionEvent =>
// 异常处理
e.getCause() match {
case _: ProtocolError =>
handleProtocolError()
case e: ClosedChannelException =>
finish()
case e: IOException =>
log.debug("I/O Exception on session %d: %s", sessionId, e.toString)
case e =>
log.error(e, "Exception caught on session %d: %s", sessionId, e.toString)
handleException(e)
}
e.getChannel().close()
case s: ChannelStateEvent =>
// 目前状态为connected但statevent.getValue is null,中断连接
if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {
finish()
} else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
// 创建连接
channel = s.getChannel()
remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
if (clientTimeout.isDefined) {
channel.getPipeline.addFirst("idle", new IdleStateHandler(Kestrel.kestrel.timer, 0, 0, clientTimeout.get.inSeconds.toInt))
}
channelGroup.add(channel)
// don't use `remoteAddress.getHostName` because it may do a DNS lookup.
log.debug("New session %d from %s:%d", sessionId, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)
}
case i: IdleStateEvent =>
// 增加idel监控
log.debug("Idle timeout on session %s", channel)
channel.close()
case e =>
// 其他消息继续发出upstream事件
context.sendUpstream(e)
}
}
MemcacheHandler和TextHandler是协议具体的实现。继承NettyHandler。因为Memcached协议比较简单,所以协议实现类就不多说了。阅读这些代码主要的障碍还是在于Java程序员对于某些scala的语法不习惯。我这里介绍个简单但是常用的:Scala的泛型。Scala创始人Martin Odersky曾说过,泛型正是他想要创建Scala语言的最重要因素之一。当然Java1.5以后已经引入了泛型,我们对这个东东已经很熟悉了。看看Twitter怎么使用Scala泛型。比教科书上生动很多。和Java使用<>指定泛型类似,NettyHandler中Scala的泛型M,放在[]里。
abstract class NettyHandler[M](
val channelGroup: ChannelGroup,
queueCollection: QueueCollection,
maxOpenTransactions: Int,
clientTimeout: Option[Duration])
extends KestrelHandler(queueCollection, maxOpenTransactions) with
ChannelUpstreamHandler {
...
def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {
event match {
case m: MessageEvent =>
handle(m.getMessage().asInstanceOf[M])
}
...
}
在NettyHandler中,任何MessageEvent都被转换为泛型M,并交给子类处理。TextHandler和MemcacheHandler是这样给自己的泛型定义的。
class TextHandler( ...) extends NettyHandler[TextRequest](...)
class MemcacheHandler(...) extends NettyHandler[MemcacheRequest](...)
接下来我们自己写一个Scala程序。
Netty服务器压测代码网上有不少版本,基本思路就是实现一个简单的echo handler。还可以添加了一个server主动push的部分。代码用scala实现,可以作为朋友们学习scala的例子。
import org.jboss.netty.channel._
import org.jboss.netty.buffer._
import org.jboss.netty.bootstrap.ServerBootstrap
import java.util._
import java.util.concurrent._
import java.io._
import java.net._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import scala.collection.mutable
object NettyLoadServer {
def main(args: Array[String]): Unit = {
val testServer = new NettyLoadServer();
testServer.loadTest();
}
}
class NettyLoadServer {
var channel: Channel = null
private var remoteAddress: InetSocketAddress = null
val channels = new mutable.ListBuffer[Channel];
var number = 0;
class LoadTestHandler extends SimpleChannelHandler with ChannelUpstreamHandler {
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
{
e.getCause().printStackTrace();
channels -= e.getChannel()
e.getChannel().close();
}
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
e.getChannel().write(e.getMessage());
}
override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
e match {
case s: ChannelStateEvent =>
if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
channel = s.getChannel()
remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
channels += channel
System.out.println("New session from " + remoteAddress.getAddress.getHostAddress +
":" + remoteAddress.getPort)
}
case e =>
// ignore
}
super.handleUpstream(ctx, e);
}
}
class ChannelManagerThread extends Thread {
override def run() {
while (true) {
try {
System.out.println("channels.size() = " + channels.count(c => c.isInstanceOf[Channel]));
for(s <- channels) {
var cb = new DynamicChannelBuffer(256);
cb.writeBytes("abcd1234".getBytes());
s.write(cb);
}
Thread.sleep(500);
}
catch {
case e => e.printStackTrace();
}
}
}
}
def loadTest() {
try {
val factory = new NioServerSocketChannelFactory(Executors
.newCachedThreadPool(), Executors.newCachedThreadPool());
val bootstrap = new ServerBootstrap(factory);
val handler = new LoadTestHandler();
val pipeline = bootstrap.getPipeline();
pipeline.addLast("loadtest", handler);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(8007));
val cmt = new ChannelManagerThread();
cmt.start();
}
catch {
case e => e.printStackTrace();
}
}
}
附件里是我的scala sbt工程。
压测client推荐使用Jboss自己的Benchmark:
http://anonsvn.jboss.org/repos/netty/subproject/benchmark/
用ab也可以:
ab -n 20000 -c 20000 -k -t 999999999 -r http://192.168.1.2:8007/
补充:Twitter还有很多很有意思的项目,希望有兴趣的朋友一起来研究学习。
- 大小: 10.5 KB
分享到:
相关推荐
在.NET 6.0上使用Kestrel配置和自定义HTTPS.doc
在ASP.NET Core中,如果在Kestrel中想使用HTTPS对站点进行加密传输,可以按照如下方式 申请证书 这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。 添加NuGet包 nuget中...
本示例可直接运行,方便快速了解Kestrel框架. Kestrel 是包含在 ASP.NET Core 项目模板中的 Web 服务器, .NET Core 支持的所有平台和版本均支持 Kestrel。
资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
NULL 博文链接:https://snowolf.iteye.com/blog/1604531
NULL 博文链接:https://snowolf.iteye.com/blog/1605229
addlog-kestrel
NULL 博文链接:https://snowolf.iteye.com/blog/1612207
NULL 博文链接:https://vanadiumlin.iteye.com/blog/1461152
红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...
在一些开发过程中,会在局域网内搭建webapi服务作为移动端的服务接口使用,但是每次实施人员要到客户现场安装iis等工具,还有一些web的配置,非常繁琐,所以想着把webapi封装到WindowService中,可以通过自定义的...
介绍和背景Kestrel项目涉及使用全自制设计的计算和自我教育的自由,直至从原理图和寄存器传输逻辑一直到OS API和用户教程的各个级别公开记录的硬件和软件。 根据我的经验,它的设计来自多种来源: 硬件工程卓越奖...
Kestrel是不是Unix或Windows的内核。
kestrel项目源文件包
Kestrel(Kotlin 事件溯源) 用于在 Kotlin 中构建基于事件的 CQRS 应用程序的框架。 概括 事件溯源是一种架构范式,其中应用程序状态被建模并存储为在您的应用程序域中有意义的语义事件的不可变序列。 CQRS,命令...
Tormenta 是 Storm 分布式计算机系统的 Scala 扩展包。Tormenta 在 Storm 的 Kafka 和 Kestrel spouts消息 上添加了一个安全类型的包装器。此安全类型允许用户推送映射消息和筛选转换信息到 spout 消息层上去。代码...
python库。 资源全名:kestrel_lang-1.1.0-py3-none-any.whl