博主头像
Kurfuerst

ワクワク

kitex源码阅读(二)

上一篇中,我们梳理了server.Server启动、运行、退出的全流程。我们知道server.Server通过调用remotesvr.Server提供的接口进行数据传输,而不需要关心连接具体如何建立、数据如何封装的问题。这篇我们就来分析一下remotesvr.Server是如何与客户端建立连接并交互的。

remote.ServerTransHandler

与客户端建立连接、传输数据等工作,remotesvr.Serve是调用了remote.TransServer提供的方法实现的:

// remotesvr.Server
type server struct {
    opt      *remote.ServerOption
    listener net.Listener
    transSvr remote.TransServer
    sync.Mutex
}

remote.TransServer又是依赖ServerTransHandler实现的这些功能:

// remote.TransServer
type TransServerFactory interface {
    NewTransServer(opt *ServerOption, transHdlr ServerTransHandler) TransServer
}

// NewTransServer implements the remote.TransServerFactory interface.
func (f *gonetTransServerFactory) NewTransServer(opt *remote.ServerOption, transHdlr remote.ServerTransHandler) remote.TransServer {
    return &transServer{
        opt:       opt,
        transHdlr: transHdlr,
        lncfg:     trans.NewListenConfig(opt),
    }
}

以下是ServerTransHandler提供的方法:

// TransHandler is similar to the handler role in netty
// Transport can be refactored to support pipeline, and then is able to support other extensions at conn level.
type TransHandler interface {
    // read() write()
    TransReadWriter
    // 关闭连接
    OnInactive(ctx context.Context, conn net.Conn)
    // 处理请求出错
    OnError(ctx context.Context, err error, conn net.Conn)
    // 请求处理完成后调用,调用先前构建的中间件调用链进行进一步处理
    OnMessage(ctx context.Context, args, result Message) (context.Context, error)
    SetPipeline(pipeline *TransPipeline)
}

// ServerTransHandler have some new functions.
type ServerTransHandler interface {
    TransHandler
    // 新建连接
    OnActive(ctx context.Context, conn net.Conn) (context.Context, error)
    // 收到请求
    OnRead(ctx context.Context, conn net.Conn) error
}

可见remote.ServerTransHandler是整个流程的核心,与客户端的连接基本是由它来负责的,那接下来就看一下remote.ServerTransHandler创建的过程:

func (s *server) newSvrTransHandler() (handler remote.ServerTransHandler, err error) {
    // 工厂模式创建
    transHdlrFactory := s.opt.RemoteOpt.SvrHandlerFactory
    transHdlr, err := transHdlrFactory.NewTransHandler(s.opt.RemoteOpt)
    if err != nil {
        return nil, err
    }
    // 判断是否实现 InvokeHandleFuncSetter 接口
    if setter, ok := transHdlr.(remote.InvokeHandleFuncSetter); ok {
        // 将服务器已经构建好的、包含所有中间件的最终处理端点 s.eps 设置给传输处理器
        setter.SetInvokeHandleFunc(s.eps)
    }
    // 创建代理类,并将核心处理器 transHdlr 作为管道的基础
    transPl := remote.NewTransPipeline(transHdlr)

    for _, ib := range s.opt.RemoteOpt.Inbounds {
        transPl.AddInboundHandler(ib)
    }
    for _, ob := range s.opt.RemoteOpt.Outbounds {
        transPl.AddOutboundHandler(ob)
    }
    return transPl, nil
}

newSvrTransHandler()返回的是一个代理类TransPipeline。类似装饰器模式,TransPipeline包装了ServerTransHandler,并添加了inboundHdrlsoutboundHdrls两个组件,这样可以在不修改核心处理器transHdlr的情况下,动态的为请求处理流程增加功能,包括限流、连接数控制、日志记录等功能。

TransPipeline类的核心ServerTransHandler是通过工厂模式创建的,我们顺着server初始化的过程一路查下去(newServer() -> NewOptions() -> newServerRemoteOption() -> newServerRemoteOption()),最后可以看到SvrHandlerFactory是通过detection.NewSvrTransHandlerFactory()创建的:

func newServerRemoteOption() *remote.ServerOption {
    return &remote.ServerOption{
        ...
        SvrHandlerFactory:     detection.NewSvrTransHandlerFactory(gonet.NewSvrTransHandlerFactory(), nphttp2.NewSvrTransHandlerFactory(), ttstream.NewSvrTransHandlerFactory()),
        ...
    }
}

// detection.NewSvrTransHandlerFactory
func NewSvrTransHandlerFactory(defaultHandlerFactory remote.ServerTransHandlerFactory,
    detectableHandlerFactory ...remote.ServerTransHandlerFactory,
) remote.ServerTransHandlerFactory {
    return &svrTransHandlerFactory{
        defaultHandlerFactory:    defaultHandlerFactory,
        detectableHandlerFactory: detectableHandlerFactory,
    }
}

kitex将gonet.NewSvrTransHandlerFactory()设置为默认的处理器工厂,而nphttp2ttstream协议的处理器工厂则被指定为可检测协议的处理器(detectableHandler)工厂。

detection.NewSvrTransHandler的作用是在请求到来时,依次使用可检测协议的处理器工厂匹配协议,如果所有detectableHandler都无法识别当前的连接协议,则会用默认的gonet处理器进行处理。这就实现了对多种传输协议的支持。

但这仍然不是具体的实现,我们看gonet.NewSvrTransHandler相关的代码:

type svrTransHandlerFactory struct{}

// NewSvrTransHandlerFactory creates a default go net server transport handler factory.
func NewSvrTransHandlerFactory() remote.ServerTransHandlerFactory {
    return &svrTransHandlerFactory{}
}

// NewTransHandler implements the remote.ServerTransHandlerFactory interface.
func (f *svrTransHandlerFactory) NewTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) {
    return newSvrTransHandler(opt)
}

func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) {
    return trans.NewDefaultSvrTransHandler(opt, NewGonetExtension())
}

svrTransHandlerFactory实现了remote.ServerTransHandlerFactory接口,用于创建ServerTransHandler实例。可以看到gonet复用了trans.svrTransHandler,而不像nphttp2ttstream实现了自己的svrTransHandler

接下来就详细看看trans.svrTransHandler的实现。

trans.svrTransHandler

上一篇中我们提到,s.transSvr.BootstrapServer(ln)是真正用于处理请求的循环,而BootstrapServer()又把处理请求的职责交给了serveConn(ctx context.Context, conn net.Conn) (err error)

func (ts *transServer) serveConn(ctx context.Context, conn net.Conn) (err error) {
    ...
    // 初始化
    ctx, err = ts.transHdlr.OnActive(ctx, bc)
    if err != nil {
        klog.CtxErrorf(ctx, "KITEX: OnActive error=%s", err)
        return err
    }
    for {
        ...
        // FIXME: for gRPC transHandler, OnRead should execute only once.
        err = ts.transHdlr.OnRead(ctx, bc)
        if err != nil {
            return err
        }
    }
}

可以看到serveConn()中先使用ts.transHdlr.OnActive(ctx, bc)进行初始化,然后调用了transHdlrOnRead()方法进行处理。

// OnActive implements the remote.ServerTransHandler interface.
func (t *svrTransHandler) OnActive(ctx context.Context, conn net.Conn) (context.Context, error) {
    ctx = remote.WithServiceSearcher(ctx, t.svcSearcher)
    // init rpcinfo
    ri := t.opt.InitOrResetRPCInfoFunc(nil, conn.RemoteAddr())
    return rpcinfo.NewCtxWithRPCInfo(ctx, ri), nil
}

OnActive 方法在每个新连接建立时执行一次,其主要任务是初始化一个与该连接绑定的 RPCInfo 对象(RPCInfo 是 Kitex 框架中用于存储单次 RPC 调用所有相关信息的核心结构,包括配置、统计数据、对端信息等),并将其与 ServiceSearcher一同存入 context 中,确保链路信息能够在整个请求处理周期中传递,为后续在该连接上处理具体的 RPC 请求做好准备。

OnRead()方法是Kitex服务端处理网络连接上新数据到达的核心方法

// OnRead implements the remote.ServerTransHandler interface.
// The connection should be closed after returning error.
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) {
    ctx, ri := t.newCtxWithRPCInfo(ctx, conn)
    t.ext.SetReadTimeout(ctx, conn, ri.Config(), remote.Server)
    var recvMsg remote.Message
    var sendMsg remote.Message
    closeConnOutsideIfErr := true
    // 资源回收与错误处理
    defer func() {
        var panicErr error
        // 捕获可能发生的panic
        // 关闭本次请求的链路追踪和性能分析
        // 资源回收
        // 重置RPCInfo对象,以便复用,减少gc压力
         ...
        if rpcinfo.PoolEnabled() {
            t.opt.InitOrResetRPCInfoFunc(ri, conn.RemoteAddr())
        }
        if err != nil && !closeConnOutsideIfErr {
            // when error is not nil, outside will close conn,
            // set err to nil to indicate that this kind of error does not require closing the connection
            err = nil
        }
    }()
    // 开始链路追踪与性能分析
    ctx = t.startTracer(ctx, ri)
    ctx = t.startProfiler(ctx)
    recvMsg = remote.NewMessage(nil, ri, remote.Call, remote.Server)
    // 设置解码器
    recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
    // 从conn中读取请求信息
    ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
    if err != nil {
        t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
        // t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
        return err
    }

    // 判断是否为心跳,是的话就创建心跳回复信息,并跳过业务逻辑处理
    if recvMsg.MessageType() == remote.Heartbeat {
        sendMsg = remote.NewMessage(nil, ri, remote.Heartbeat, remote.Server)
    } else {
        // reply processing
        methodInfo := ri.Invocation().MethodInfo()
        // 判断是否为单向调用,不是的话会生成一个空的result结构体
        if methodInfo.OneWay() {
            sendMsg = remote.NewMessage(nil, ri, remote.Reply, remote.Server)
        } else {
            sendMsg = remote.NewMessage(methodInfo.NewResult(), ri, remote.Reply, remote.Server)
        }

        // 进入处理流程
        ctx, err = t.transPipe.OnMessage(ctx, recvMsg, sendMsg)
        if err != nil {
            // error cannot be wrapped to print here, so it must exec before NewTransError
            t.OnError(ctx, err, conn)
            err = remote.NewTransError(remote.InternalError, err)
            if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false, false); closeConn {
                return err
            }
            // connection don't need to be closed when the error is return by the server handler
            closeConnOutsideIfErr = false
            return
        }
    }

    // 设置编码器
    sendMsg.SetPayloadCodec(t.opt.PayloadCodec)
    // 写回响应
    if ctx, err = t.transPipe.Write(ctx, conn, sendMsg); err != nil {
        // t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
        return err
    }
    return
}

OnRead()方法的总体流程大概是

  1. 从连接中读取数据并解码成一个请求消息 (recvMsg)。
  2. 处理该请求(如果是心跳则准备心跳回复,如果是普通请求则调用业务逻辑)。
  3. 将业务逻辑的执行结果或错误编码成一个响应消息 (sendMsg)。
  4. 将响应消息写回连接。
  5. 在整个过程中进行 tracing(链路追踪)、profiling(性能分析)、错误处理和资源回收。
Read()

OnRead()方法调用了Read()去从连接中读取并解码

// Read implements the remote.ServerTransHandler interface.
func (t *svrTransHandler) Read(ctx context.Context, conn net.Conn, recvMsg remote.Message) (nctx context.Context, err error) {
    var bufReader remote.ByteBuffer
    ri := recvMsg.RPCInfo()
    defer func() {
        // 错误处理
        ...
        // 记录结束事件
        rpcinfo.Record(ctx, ri, stats.ReadFinish, err)
    }()

    // 记录一个 ReadStart 事件。这用于链路追踪和性能统计,标记着从连接读取数据的操作正式开始
    rpcinfo.Record(ctx, ri, stats.ReadStart, nil)

    bufReader = t.ext.NewReadByteBuffer(ctx, conn, recvMsg)
    // 检查是否实现了 remote.MetaDecoder 接口
    if codec, ok := t.codec.(remote.MetaDecoder); ok {
        // 先解码消息头
        if err = codec.DecodeMeta(ctx, recvMsg, bufReader); err == nil {
            if t.opt.Profiler != nil && t.opt.ProfilerTransInfoTagging != nil && recvMsg.TransInfo() != nil {
                var tags []string
                ctx, tags = t.opt.ProfilerTransInfoTagging(ctx, recvMsg)
                ctx = t.opt.Profiler.Tag(ctx, tags...)
            }
            // 解码消息体
            err = codec.DecodePayload(ctx, recvMsg, bufReader)
        }
    } else {
        // 不支持MetaDecoder则进行单阶段解码
        err = t.codec.Decode(ctx, recvMsg, bufReader)
    }
    if err != nil {
        // 出现错误会给recvMsg打上一个ReadFail的标签,用于后续的错误处理
        recvMsg.Tags()[remote.ReadFailed] = true
        return ctx, err
    }
    return ctx, nil
}

Read()方法会先从 Extension (t.ext)获取一个用于读取数据的 ByteBuffer,
随后会判断t.codec是否实现了remote.MetaDecode接口,这是一种更优的解码方式,支持将消息头(Meta)和消息体(Payload)分开解码。实现了则使用 DecodeMeta 和 DecodePayload 进行两阶段解码,先解析消息头,再解析消息体。如果 Codec 不支持,则回退到使用 Decode 方法一次性解码整个消息。

注意到在获取buffer之前,Read()先调用了rpcinfo.Record()方法,而在用于处理异常的buffer块中也调用了同样的方法,这个方法会记录读取开始与结束事件,用于链路追踪与性能统计。

// Record records the event to RPCStats.
func Record(ctx context.Context, ri RPCInfo, event stats.Event, err error) {
    if ctx == nil {
        return
    }
    st := ri.Stats()
    if st == nil {
        return
    }
    if err != nil {
        st.Record(ctx, event, stats.StatusError, err.Error())
    } else {
        st.Record(ctx, event, stats.StatusInfo, "")
    }
}

// Record implements the RPCStats interface.
// It only record once for each event, it will ignore follow events with the same Index()
func (r *rpcStats) Record(ctx context.Context, e stats.Event, status stats.Status, info string) {
    if e.Level() > r.level {
        return
    }
    idx := e.Index()
    p := &r.eventStatus[idx]
    if atomic.CompareAndSwapUint32(p, eventUnset, eventUpdating) ||
        (r.copied && atomic.CompareAndSwapUint32(p, eventStale, eventUpdating)) {
        r.eventMap[idx] = event{event: e, status: status, info: info, time: time.Now()}
        atomic.StoreUint32(p, eventRecorded) // done, make it visible to GetEvent
    } else {
        // eventRecorded? panic?
    }
}

这个辅助函数最终会调用 RPCStats 接口的 Record()方法。这个方法会使用 atomic.CompareAndSwapUint32 确保对于同一种 event(由 event.Index() 标识),只记录一次。这可以防止在重试等复杂场景下重复记录同一个事件点。当成功获取记录权限后,它会创建一个 event 结构体,包含事件类型、状态、信息和当前时间戳 time.Now(),并将其存储在 rpcStats 的 eventMap 数组中。

  • 关于atomic.CompareAndSwapUint32:这是一个“比较并交换”操作,是实现无锁并发编程的核心原语之一。它的行为可以分解为以下三个步骤,但这三个步骤是作为一个不可分割的原子操作完成的:
    读取::读取 addr 指针指向的内存地址的当前值。
    比较:将读取到的当前值与 old 参数进行比较。
    交换:如果 当前值等于 old,那么就将 new 的值写入到 addr 指向的地址,并返回 true。

    如果 当前值不等于 old(意味着在当前 goroutine 读取和尝试写入的间隙,有其他 goroutine 修改了这个值),那么什么也不做,直接返回 false。
    

Record()代码中,atomic.CompareAndSwapUint32是为了防止在重试等复杂场景下重复记录同一个事件点。

if atomic.CompareAndSwapUint32(p, eventUnset, eventUpdating) || ...

当一个 goroutine 调用 Record 时,它会尝试将事件状态从 eventUnset 原子地更新为 eventUpdating。如果 CompareAndSwapUint32 返回 true,说明它是第一个成功修改此事件状态的 goroutine。它就获得了“记录权”,可以继续执行 if 块内的逻辑来记录事件的详细信息。如果返回 false,说明在它执行 CAS 操作之前,另一个 goroutine 已经将状态从 eventUnset 修改掉了。当前 goroutine 就知道这个事件已经被记录过了,不会进入 if 块,从而避免了重复记录

(r.copied && atomic.CompareAndSwapUint32(p, eventStale, eventUpdating)

这个判断逻辑用于 RPC 重试场景:先通过r.copied检查当前 rpcStats 对象是否是从上一次失败的请求中复制过来的,如果是复制的,那么某些事件(如 RPCStart)可能从上一次请求中被带了过来,其状态会被标记为 eventStale(陈旧的)。atomic.CompareAndSwapUint32(p, eventStale, eventUpdating)这个 CAS 操作尝试将“陈旧”状态更新为“正在更新”。如果操作成功,说明这是一个重试请求,并且我们现在要用本次重试的新事件数据覆盖掉上次请求的旧数据。goroutine 同样获得了记录权。

OnMessage()

OnMessage()方法负责进行消息处理

// OnMessage implements the InboundHandler interface.
func (p *TransPipeline) OnMessage(ctx context.Context, args, result Message) (context.Context, error) {
    var err error
    for _, h := range p.inboundHdrls {
        ctx, err = h.OnMessage(ctx, args, result)
        if err != nil {
            return ctx, err
        }
    }
    if result.MessageType() == Exception {
        return ctx, nil
    }
    return p.netHdlr.OnMessage(ctx, args, result)
}

一个请求消息被成功从网络连接中读取并解码后,OnMessage()会对其进行处理。它通过调用链,将消息依次传递给一系列注册的入站处理器(Inbound Handlers),最终再交给核心的网络处理器(netHdlr)来执行业务逻辑。

在执行完所有前置的 inboundHdrls 后,代码会检查 result 消息的类型。如果某个前置处理器(比如认证处理器)发现请求不合法,它不会返回 error(因为error通常表示连接级错误),而是会将 result 消息的类型设置为 Exception 并填充错误信息。这个检查就是为了处理这种情况:如果 result已经被标记为异常,就意味着请求已经被处理(判定为失败),无需再执行后续的业务逻辑。

Write()

最后是Write()方法,负责写回响应

// Write implements the OutboundHandler interface.
func (p *TransPipeline) Write(ctx context.Context, conn net.Conn, sendMsg Message) (nctx context.Context, err error) {
    for _, h := range p.outboundHdrls {
        ctx, err = h.Write(ctx, conn, sendMsg)
        if err != nil {
            return ctx, err
        }
    }
    return p.netHdlr.Write(ctx, conn, sendMsg)
}

// Write implements the remote.ServerTransHandler interface.
func (t *svrTransHandler) Write(ctx context.Context, conn net.Conn, sendMsg remote.Message) (nctx context.Context, err error) {
    var bufWriter remote.ByteBuffer
    ri := sendMsg.RPCInfo()
    rpcinfo.Record(ctx, ri, stats.WriteStart, nil)
    defer func() {
        t.ext.ReleaseBuffer(bufWriter, err)
        rpcinfo.Record(ctx, ri, stats.WriteFinish, err)
    }()

    // 判断是否是单向调用
    if methodInfo := ri.Invocation().MethodInfo(); methodInfo != nil && methodInfo.OneWay() {
        return ctx, nil
    }

    bufWriter = t.ext.NewWriteByteBuffer(ctx, conn, sendMsg)
    err = t.codec.Encode(ctx, sendMsg, bufWriter)
    if err != nil {
        return ctx, err
    }
    return ctx, bufWriter.Flush()
}

大概流程是先记录写入开始事件 (WriteStart),然后判断是否为单向调用。单向调用意味着客户端只管发送请求,不关心也不等待服务端的响应。如果是单向调用,服务端则无需执行任何写入操作,直接成功返回,从而节省了编码和网络 I/O 的开销。如果不是则从 Extension 获取一个优化的 ByteBuffer 用于写入,并使用 CodecMessage 对象编码成字节流,存入 ByteBuffer。最后调用 ByteBufferFlush 方法,将编码后的数据通过网络连接发送给客户端。

kitex源码阅读(二)
http://blog.kurfuerst.online/index.php/archives/39/
本文作者 Großer Kurfürst
发布时间 2025-10-27
许可协议 CC BY-NC-SA 4.0
发表新评论