thrift 协议(TProtocol)

 协议大家平时都会遇到,只是没有特别注意。
 像平时大家阅读文章的时候都是从上到下、从左往右 按行阅读,这可以看做一种阅读协议。 ( 备注: 古人在竹简上写的文字则是从上往下、从右往左 按列阅读。)
 更详细的规则比如:作文的第一行是标题,段首要空两格的是一个自然段,遇到一个句号是一句话。

详细的标点符号用法(通信协议)参考教育部的规范 标点符号用法 - 教育部

在计算机远程方法调用时,传输的都是二进制的01,调用方(写数据)和被调用方(读数据)怎么约定通信协议的?

(1) 协议(TProtocol)的作用

协议的作用就类似于文字中的符号,作为应用拆解请求消息的边界,保证二进制数据经过网络传输后,还能被正确地还原语义。

具体点就是从二进制数据中解析出协议版本方法名消息类型序列Id序列化方式消息长度协议体等内容。

thrift-protocol-message

协议在thrift中的作用如图中绿色部分所示,实现了RPC里通信协议的功能。
thrift-layers

thrift中协议层规定了传输协议的规范,

Thrift支持让用户选择客户端与服务端之间传输通信协议,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。详细分类如下:

TBinaryProtocol 二进制编码格式进行数据传输
TCompactProtocol 高效率、密集的二进制编码格式进行数据传输,会对数据进行压缩
TDebugProtocol
THeaderProtocol
TJSONProtocol 使用JSON的数据编码协议进行数据传输
TMultiplexedProtocol
TSimpleJSONProtocol 只提供JSON只写的协议,使用与通过脚本语言解析

(1.1) thrift协议设计

thrift协议里主要包含 Message、Struct、Field、Data 等内容。

(1.1.1) Message设计

      <message> ::= <message-begin> <struct> <message-end>

<message-begin> ::= <method-name> <message-type> <message-seqid>

  <method-name> ::= STRING

 <message-type> ::= T_CALL | T_REPLY | T_EXCEPTION | T_ONEWAY

<message-seqid> ::= I32

Message里规定了 方法名消息类型序列Id消息序列Id的顺序、占用空间大小和解析规则。 如下图

thrift-protocol-message

Binary protocol Message, strict encoding, 12+ bytes:
+--------+--------+--------+--------+--------+--------+--------+--------+--------+...+--------+--------+--------+--------+--------+
|1vvvvvvv|vvvvvvvv|unused  |00000mmm| name length                       | name                | seq id                            |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+...+--------+--------+--------+--------+--------+

(1.1.2) Struct设计

      <struct> ::= <struct-begin> <field>* <field-stop> <struct-end>

<struct-begin> ::= <struct-name>

 <struct-name> ::= STRING

  <field-stop> ::= T_STOP

       <field> ::= <field-begin> <field-data> <field-end>

struct是Message的主要内容,里面对应idl里的n个字段,结构如图所示

thrift-protocol-struct


(1.1.3) Field设计

      <field> ::= <field-begin> <field-data> <field-end>

<field-begin> ::= <field-name> <field-type> <field-id>

 <field-name> ::= STRING

 <field-type> ::= T_BOOL | T_BYTE | T_I8 | T_I16 | T_I32 | T_I64 | T_DOUBLE
                  | T_STRING | T_BINARY | T_STRUCT | T_MAP | T_SET | T_LIST
                  | T_UUID

   <field-id> ::= I16

 <field-data> ::= I8 | I16 | I32 | I64 | DOUBLE | STRING | BINARY
                  <struct> | <map> | <list> | <set>

thrift-protocol-field


(1.1.4) 基础类型设计

           <map> ::= <map-begin> <field-datum>* <map-end>

     <map-begin> ::= <map-key-type> <map-value-type> <map-size>

  <map-key-type> ::= <field-type>

<map-value-type> ::= <field-type>

      <map-size> ::= I32

          <list> ::= <list-begin> <field-data>* <list-end>

    <list-begin> ::= <list-elem-type> <list-size>

<list-elem-type> ::= <field-type>

     <list-size> ::= I32

           <set> ::= <set-begin> <field-data>* <set-end>

     <set-begin> ::= <set-elem-type> <set-size>

 <set-elem-type> ::= <field-type>

      <set-size> ::= I32

基础类型主要分2类,
一类是基础的数字类型,直接使用数字表示即可,占用1、2、3、4字节,
一类是字符串类型
一类是二进制数据,也就是字节数组
还有一类是集合类型,像map、list、set,稍微复杂点

thrift-protocol-data-type


(1.1.5) thrift协议设计全景图

Message、Struct、Field、Data 组合后如下图所示

thrift-protocol-all

thrift-protocol-spec

       <message> ::= <message-begin> <struct> <message-end>

 <message-begin> ::= <method-name> <message-type> <message-seqid>

   <method-name> ::= STRING

  <message-type> ::= T_CALL | T_REPLY | T_EXCEPTION | T_ONEWAY

 <message-seqid> ::= I32

        <struct> ::= <struct-begin> <field>* <field-stop> <struct-end>

  <struct-begin> ::= <struct-name>

   <struct-name> ::= STRING

    <field-stop> ::= T_STOP

         <field> ::= <field-begin> <field-data> <field-end>

   <field-begin> ::= <field-name> <field-type> <field-id>

    <field-name> ::= STRING

    <field-type> ::= T_BOOL | T_BYTE | T_I8 | T_I16 | T_I32 | T_I64 | T_DOUBLE
                     | T_STRING | T_BINARY | T_STRUCT | T_MAP | T_SET | T_LIST
                     | T_UUID

      <field-id> ::= I16

    <field-data> ::= I8 | I16 | I32 | I64 | DOUBLE | STRING | BINARY
                     <struct> | <map> | <list> | <set>

           <map> ::= <map-begin> <field-datum>* <map-end>

     <map-begin> ::= <map-key-type> <map-value-type> <map-size>

  <map-key-type> ::= <field-type>

<map-value-type> ::= <field-type>

      <map-size> ::= I32

          <list> ::= <list-begin> <field-data>* <list-end>

    <list-begin> ::= <list-elem-type> <list-size>

<list-elem-type> ::= <field-type>

     <list-size> ::= I32

           <set> ::= <set-begin> <field-data>* <set-end>

     <set-begin> ::= <set-elem-type> <set-size>

 <set-elem-type> ::= <field-type>

      <set-size> ::= I32

thrift-binary-protocol


(1.2) thrift通信协议的优缺点

优点:
1、协议特别紧凑

缺点
1、二进制协议(TBinaryProtocol) 不支持多版本,需要使用方从业务层面


(2) thrift调用demo

Java代码 https://github.com/weikeqin/thrift-tutorial-java-demo
Go代码 https://github.com/weikeqin/thrift-tutorial-go-demo

(2.1) idl

namespace java tutorial
namespace go tutorial


service Calculator extends shared.SharedService {

   // 计算两数之和
   i32 add(1:i32 num1, 2:i32 num2),

}
namespace java tutorial
namespace go tutorial

struct SharedStruct {
  1: i32 key
  2: string value
}

// 
service SharedService {
  SharedStruct getStruct(1: i32 key)
}

idl生成的go代码

thrift -r --gen go service.thrift
type Calculator interface {
  shared.SharedService

  // Parameters:
  //  - Num1
  //  - Num2
  Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error)

}
//  

type CalculatorClient struct {
  *shared.SharedServiceClient
}
//

type SharedServiceClient struct {
  c thrift.TClient
  meta thrift.ResponseMeta
}
// Parameters:
//  - Num1
//  - Num2
func (p *CalculatorClient) Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error) {
  // 入参
  var _args3 CalculatorAddArgs
  _args3.Num1 = num1
  _args3.Num2 = num2
  // 声明出参
  var _result5 CalculatorAddResult
  var _meta4 thrift.ResponseMeta
  // 调用方法
  _meta4, _err = p.Client_().Call(ctx, "add", &_args3, &_result5)
  // 设置header
  p.SetLastResponseMeta_(_meta4)
  if _err != nil {
    return
  }
  // 返回结果
  return _result5.GetSuccess(), nil
}
// 写入参Struct 
func (p *CalculatorAddArgs) Write(ctx context.Context, oprot thrift.TProtocol) error {
  // 写Struct开始标记
  if err := oprot.WriteStructBegin(ctx, "add_args"); err != nil {
    return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
  if p != nil {
	// 这儿入参有几个字段就写几个字段 
	// 写字段1  
	if err := p.writeField1(ctx, oprot); err != nil { return err }
	// 写字段2
    if err := p.writeField2(ctx, oprot); err != nil { return err }
  }
  // 写字段停止
  if err := oprot.WriteFieldStop(ctx); err != nil {
	return thrift.PrependError("write field stop error: ", err) }
  // 写字段结束标记
  if err := oprot.WriteStructEnd(ctx); err != nil {
    return thrift.PrependError("write struct stop error: ", err) }
  return nil
}
func (p *CalculatorAddArgs) writeField1(ctx context.Context, oprot thrift.TProtocol) (err error) {
  // 写字段名
  if err := oprot.WriteFieldBegin(ctx, "num1", thrift.I32, 1); err != nil {
	return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:num1: ", p), err) }
  // 写字段值	
  if err := oprot.WriteI32(ctx, int32(p.Num1)); err != nil {
  return thrift.PrependError(fmt.Sprintf("%T.num1 (1) field write error: ", p), err) }
  // 写字段结束标记
  if err := oprot.WriteFieldEnd(ctx); err != nil {
    return thrift.PrependError(fmt.Sprintf("%T write field end error 1:num1: ", p), err) }
  return err
}
func (p *CalculatorAddArgs) writeField2(ctx context.Context, oprot thrift.TProtocol) (err error) {
  // 写字段名
  if err := oprot.WriteFieldBegin(ctx, "num2", thrift.I32, 2); err != nil {
	return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:num2: ", p), err) }
  // 写字段值
  if err := oprot.WriteI32(ctx, int32(p.Num2)); err != nil {
  return thrift.PrependError(fmt.Sprintf("%T.num2 (2) field write error: ", p), err) }
  // 写字段结束标记
  if err := oprot.WriteFieldEnd(ctx); err != nil {
    return thrift.PrependError(fmt.Sprintf("%T write field end error 2:num2: ", p), err) }
  return err
}

thrift生成java代码

thrift -r --gen java service.thrift

(3) thrift go源码解析

thrift go源码版本 0.16.0
https://github.com/apache/thrift/tree/0.16.0/lib/go/thrift

TProtocol定义了基本的协议信息,包括传输什么数据,如何解析传输的数据的基本方法。

TProtocol有多种实现,包括 TBinaryProtocol、TCompactProtocol、TDebugProtocol、THeaderProtocol、TJSONProtocol、TMultiplexedProtocol、TSimpleJSONProtocol

(3.1) thrift支持的数据类型

package thrift

// Type constants in the Thrift protocol
type TType byte

const (
	STOP   = 0
	VOID   = 1
	BOOL   = 2
	BYTE   = 3
	I08    = 3
	DOUBLE = 4
	I16    = 6
	I32    = 8
	I64    = 10
	STRING = 11
	UTF7   = 11
	STRUCT = 12
	MAP    = 13
	SET    = 14
	LIST   = 15
	UTF8   = 16
	UTF16  = 17
	//BINARY = 18   wrong and unusued
)

(3.2) TProtocol定义

// package thrift
type TProtocol interface {
	WriteMessageBegin(name string, typeId TMessageType, seqid int32) error
	WriteMessageEnd() error

	WriteStructBegin(name string) error
	WriteStructEnd() error

	WriteFieldBegin(name string, typeId TType, id int16) error
	WriteFieldEnd() error
	WriteFieldStop() error

	WriteMapBegin(keyType TType, valueType TType, size int) error
	WriteMapEnd() error

	WriteListBegin(elemType TType, size int) error
	WriteListEnd() error

	WriteSetBegin(elemType TType, size int) error
	WriteSetEnd() error

	WriteBool(value bool) error
	WriteByte(value int8) error
	WriteI16(value int16) error
	WriteI32(value int32) error
	WriteI64(value int64) error
	WriteDouble(value float64) error
	WriteString(value string) error
	WriteBinary(value []byte) error

	ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error)
	ReadMessageEnd() error

	ReadStructBegin() (name string, err error)
	ReadStructEnd() error

	ReadFieldBegin() (name string, typeId TType, id int16, err error)
	ReadFieldEnd() error

	ReadMapBegin() (keyType TType, valueType TType, size int, err error)
	ReadMapEnd() error

	ReadListBegin() (elemType TType, size int, err error)
	ReadListEnd() error

	ReadSetBegin() (elemType TType, size int, err error)
	ReadSetEnd() error

	ReadBool() (value bool, err error)
	ReadByte() (value int8, err error)
	ReadI16() (value int16, err error)
	ReadI32() (value int32, err error)
	ReadI64() (value int64, err error)
	ReadDouble() (value float64, err error)
	ReadString() (value string, err error)
	ReadBinary() (value []byte, err error)

	Skip(fieldType TType) (err error)
	Flush() (err error)

	Transport() TTransport
}

服务器端如何知道客户端发送过来的数据是怎么组合的,比如第一个字段是字符串类型,第二个字段是int。这个信息是在IDL生成客户端时生成的代码时提供了。Thrift生成的客户端代码提供了读写参数的方法,这两个方式是一一对应的,包括字段的序号,类型等等。客户端使用写参数的方法,服务器端使用读参数的方法。

  1. 方法的调用从writeMessageBegin开始,发送了消息头信息
  2. 写方法的参数,也就是写消息体。方法参数由一个统一的接口TBase描述,提供了read和write的统一接口。自动生成的代码提供了read, write方法参数的具体实现
  3. 写完结束

(3.3) TBinaryProtocol实现

TBinaryProtocol

(3.3.1) TBinaryProtocol-go实现

type TBinaryProtocol struct {
	trans         TRichTransport  // 增强(包装)传输对象
	origTransport TTransport      // 传输对象 封装IO层
	cfg           *TConfiguration // 传输配置 
	buffer        [64]byte        // 缓冲
}

WriteMessageBegin

/**
 * Writing Methods
 */
 // param name 消息名称 对应方法名  "GetUserById"
// param typeId 消息类型ID  thrift.CALL
// param seqId 序列号 自增  
func (p *TBinaryProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqId int32) error {
	if p.cfg.GetTBinaryStrictWrite() { // 直接写入
		// 计算版本
		// 十六进制:80010000  转成二进制:10000000000000010000000000000000 
		// TMessageType 是int32类型,但实际上目前只用了[0,4]的5个值 也就是只用到了int32类型的低3位(低3位可以表示8个值) 
		version := uint32(VERSION_1) | uint32(typeId)
		// 写入版本 // version 长度32-bit 4字节
		e := p.WriteI32(ctx, int32(version))
		if e != nil {
			return e
		}
		// 写入方法名  当前这个例子里指"GetUserById"
		e = p.WriteString(ctx, name)
		if e != nil {
			return e
		}
		// 写入序列Id  32位 4字节
		e = p.WriteI32(ctx, seqId)
		return e
	} else {
		// 写入方法名  当前这个例子里指"GetUserById"
		e := p.WriteString(ctx, name)
		if e != nil {
			return e
		}
		// 写入类型id typeId是thrift定义的消息类型Id  int32类型 4字节
		e = p.WriteByte(ctx, int8(typeId))
		if e != nil {
			return e
		}
		// 写入序列Id  32位 4字节
		e = p.WriteI32(ctx, seqId)
		return e
	}
	return nil
}

WriteMessageEnd

func (p *TBinaryProtocol) WriteMessageEnd(ctx context.Context) error {
	return nil
}

WriteStructBegin

func (p *TBinaryProtocol) WriteStructBegin(ctx context.Context, name string) error {
	return nil
}

WriteStructEnd

func (p *TBinaryProtocol) WriteStructEnd(ctx context.Context) error {
	return nil
}

WriteFieldBegin

// param name 字段名
// param typeId 字段类型
// param id  字段对应idl里的id
func (p *TBinaryProtocol) WriteFieldBegin(ctx context.Context, name string, typeId TType, id int16) error {
	// 写字段类型 8位 也就是1字节
	e := p.WriteByte(ctx, int8(typeId))
	if e != nil {
		return e
	}
	// 写入idl里的id  16位 2字节
	e = p.WriteI16(ctx, id)
	return e
}

WriteFieldEnd

func (p *TBinaryProtocol) WriteFieldEnd(ctx context.Context) error {
	return nil
}

WriteMapBegin

// keyType key类型 
// valueType value类型
// size map大小
func (p *TBinaryProtocol) WriteMapBegin(ctx context.Context, keyType TType, valueType TType, size int) error {
	// 写入key类型  8位 1字节
	e := p.WriteByte(ctx, int8(keyType))
	if e != nil {
		return e
	}
	// 写入value类型  8位 1字节
	e = p.WriteByte(ctx, int8(valueType))
	if e != nil {
		return e
	}
	// 写入map长度  32位  也就是4字节
	e = p.WriteI32(ctx, int32(size))
	return e
}

WriteMapEnd

func (p *TBinaryProtocol) WriteMapEnd(ctx context.Context) error {
	return nil
}

WriteListBegin

func (p *TBinaryProtocol) WriteListBegin(ctx context.Context, elemType TType, size int) error {
	// 写入类型  8位 1字节
	e := p.WriteByte(ctx, int8(elemType))
	if e != nil {
		return e
	}
	// 写入list长度  32位  4字节
	e = p.WriteI32(ctx, int32(size))
	return e
}

WriteListEnd

func (p *TBinaryProtocol) WriteListEnd(ctx context.Context) error {
	return nil
}

WriteSetBegin

// param 
// param elemType 
// param size
func (p *TBinaryProtocol) WriteSetBegin(ctx context.Context, elemType TType, size int) error {
	// 写入thrift类型 8位 1字节
	e := p.WriteByte(ctx, int8(elemType))
	if e != nil {
		return e
	}
	// 写入set长度
	e = p.WriteI32(ctx, int32(size))
	return e
}

WriteSetEnd

func (p *TBinaryProtocol) WriteSetEnd(ctx context.Context) error {
	return nil
}

 

WriteBool

func (p *TBinaryProtocol) WriteBool(ctx context.Context, value bool) error {
	if value {
		// 使用WriteByte  长度8-bit 1字节
		return p.WriteByte(ctx, 1)
	}
	// 使用WriteByte  长度8-bit 1字节
	return p.WriteByte(ctx, 0)
}

WriteByte

// param ctx
// param value 写入的值 
func (p *TBinaryProtocol) WriteByte(ctx context.Context, value int8) error {
	// 写入byte类型值  长度8-bit 1字节
	e := p.trans.WriteByte(byte(value))
	return NewTProtocolException(e)
}

WriteI16

// param value 写入的值 
func (p *TBinaryProtocol) WriteI16(ctx context.Context, value int16) error {
	v := p.buffer[0:2]
	// 把int16转成byte数组 长度16-bit 2字节
	binary.BigEndian.PutUint16(v, uint16(value))
	_, e := p.trans.Write(v)
	return NewTProtocolException(e)
}

WriteI32

// param ctx
// param value 写入的值 
func (p *TBinaryProtocol) WriteI32(ctx context.Context, value int32) error {
	v := p.buffer[0:4]
	// 把int32转成byte数组   长度32-bit 4字节
	binary.BigEndian.PutUint32(v, uint32(value))
	_, e := p.trans.Write(v)
	return NewTProtocolException(e)
}

WriteI64

// 
// param value 写入的值 
func (p *TBinaryProtocol) WriteI64(ctx context.Context, value int64) error {
	// 声明一个8字节的数组
	v := p.buffer[0:8]
	// 把int64转成byte数组  长度64bit 8字节
	binary.BigEndian.PutUint64(v, uint64(value))
	_, err := p.trans.Write(v)
	return NewTProtocolException(err)
}

WriteDouble

func (p *TBinaryProtocol) WriteDouble(ctx context.Context, value float64) error {
	// 先把float64转成int64 然后再调WriteI64 
	return p.WriteI64(ctx, int64(math.Float64bits(value)))
}

WriteString

func (p *TBinaryProtocol) WriteString(ctx context.Context, value string) error {
	// 写入字符串长度
	e := p.WriteI32(ctx, int32(len(value)))
	if e != nil {
		return e
	}
	// 写入字符串
	_, err := p.trans.WriteString(value)
	return NewTProtocolException(err)
}

WriteBinary

func (p *TBinaryProtocol) WriteBinary(ctx context.Context, value []byte) error {
	// 写入byte数组的长度
	e := p.WriteI32(ctx, int32(len(value)))
	if e != nil {
		return e
	}
	// 写入byte数组
	_, err := p.trans.Write(value)
	return NewTProtocolException(err)
}

ReadMessageBegin

// param ctx 
// return name 
// return typeId 
// return seqId
// return error 
func (p *TBinaryProtocol) ReadMessageBegin(ctx context.Context) (name string, typeId TMessageType, seqId int32, err error) {
	// 读取前32位 也就是前4个字节 
	// 根据读取到的size大小来判断,
	// 如果size<0
	// 如果size>0,读取到的size大小就是字符串的长度
	size, e := p.ReadI32(ctx)
	if e != nil {
		return "", typeId, 0, NewTProtocolException(e)
	}
	// 只有直接写入时,读取时读取到的size才会<0
	if size < 0 {
		// 类型Id  写入时通过或运算  读取时通过与运算  
		typeId = TMessageType(size & 0x0ff)
		// 版本
		version := int64(int64(size) & VERSION_MASK)
		if version != VERSION_1 {
			return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Bad version in ReadMessageBegin"))
		}
		// 获取方法名
		name, e = p.ReadString(ctx)
		if e != nil {
			return name, typeId, seqId, NewTProtocolException(e)
		}
		// 读取32位 也就是4字节 获取序列Id
		seqId, e = p.ReadI32(ctx)
		if e != nil {
			return name, typeId, seqId, NewTProtocolException(e)
		}
		return name, typeId, seqId, nil
	}
	if p.cfg.GetTBinaryStrictRead() {
		return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Missing version in ReadMessageBegin"))
	}
	// 根据size大小读取字符串  获取方法名
	name, e2 := p.readStringBody(size)
	if e2 != nil {
		return name, typeId, seqId, e2
	}
	// 读取类型typeId  对应thrift定义的类型
	b, e3 := p.ReadByte(ctx)
	if e3 != nil {
		return name, typeId, seqId, e3
	}
	typeId = TMessageType(b)
	// 读序Id 32位 4字节
	seqId, e4 := p.ReadI32(ctx)
	if e4 != nil {
		return name, typeId, seqId, e4
	}
	return name, typeId, seqId, nil
}

ReadMessageEnd

func (p *TBinaryProtocol) ReadMessageEnd(ctx context.Context) error {
	return nil
}

ReadStructBegin

func (p *TBinaryProtocol) ReadStructBegin(ctx context.Context) (name string, err error) {
	return
}

ReadStructEnd

func (p *TBinaryProtocol) ReadStructEnd(ctx context.Context) error {
	return nil
}

ReadFieldBegin

// 
func (p *TBinaryProtocol) ReadFieldBegin(ctx context.Context) (name string, typeId TType, seqId int16, err error) {
	// 获取对应的thrift字段类型  读取前8位 也就是1字节  
	t, err := p.ReadByte(ctx)
	typeId = TType(t)
	if err != nil {
		return name, typeId, seqId, err
	}
	if t != STOP {
		// 获取idl里设置的id   16位 2字节  
		seqId, err = p.ReadI16(ctx)
	}
	// 返回的name是空的 
	return name, typeId, seqId, err
}

ReadFieldEnd

func (p *TBinaryProtocol) ReadFieldEnd(ctx context.Context) error {
	return nil
}

ReadMapBegin

func (p *TBinaryProtocol) ReadMapBegin(ctx context.Context) (kType, vType TType, size int, err error) {
	// 读取8位 也就是1字节
	k, e := p.ReadByte(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	// key对应的thrift类型
	kType = TType(k)

	// 读取8位 也就是1字节
	v, e := p.ReadByte(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	// value对应的thrift类型
	vType = TType(v)

	// 读取32位 也就是4字节
	size32, e := p.ReadI32(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	err = checkSizeForProtocol(size32, p.cfg)
	if err != nil {
		return
	}
	size = int(size32)
	// key字段类型 value字段类型  map的长度
	return kType, vType, size, nil
}

ReadMapEnd

func (p *TBinaryProtocol) ReadMapEnd(ctx context.Context) error {
	return nil
}

ReadListBegin

// 
func (p *TBinaryProtocol) ReadListBegin(ctx context.Context) (elemType TType, size int, err error) {
	// 读取8位 也就是1字节
	b, e := p.ReadByte(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	// 类型
	elemType = TType(b)
	// 读取32位 也就是4字节
	size32, e := p.ReadI32(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	err = checkSizeForProtocol(size32, p.cfg)
	if err != nil {
		return
	}
	// list的长度
	size = int(size32)

	return
}

ReadListEnd

func (p *TBinaryProtocol) ReadListEnd(ctx context.Context) error {
	return nil
}
// param ctx
func (p *TBinaryProtocol) ReadSetBegin(ctx context.Context) (elemType TType, size int, err error) {
	// 读取一字节
	b, e := p.ReadByte(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	// set里的元素类型
	elemType = TType(b)

	// 读取32位 4字节
	size32, e := p.ReadI32(ctx)
	if e != nil {
		err = NewTProtocolException(e)
		return
	}
	// 校验大小
	err = checkSizeForProtocol(size32, p.cfg)
	if err != nil {
		return
	}
	// set大小
	size = int(size32)
	return elemType, size, nil
}

ReadSetEnd

func (p *TBinaryProtocol) ReadSetEnd(ctx context.Context) error {
	return nil
}

ReadBool

func (p *TBinaryProtocol) ReadBool(ctx context.Context) (bool, error) {
	// 读取1字节 
	b, e := p.ReadByte(ctx)
	// bool用byte类型存储  true:1 false:0 
	v := true
	if b != 1 {
		v = false
	}
	return v, e
}

ReadByte

func (p *TBinaryProtocol) ReadByte(ctx context.Context) (int8, error) {
	v, err := p.trans.ReadByte()
	return int8(v), err
}

ReadI16

// 
func (p *TBinaryProtocol) ReadI16(ctx context.Context) (value int16, err error) {
	buf := p.buffer[0:2]
	// 读取2字节
	err = p.readAll(ctx, buf)
	// 从字节数组里读取int16 
	value = int16(binary.BigEndian.Uint16(buf))
	return value, err
}

ReadI32

func (p *TBinaryProtocol) ReadI32(ctx context.Context) (value int32, err error) {
	buf := p.buffer[0:4]
	// 读取4字节
	err = p.readAll(ctx, buf)
	// 从字节数组读取int32
	value = int32(binary.BigEndian.Uint32(buf))
	return value, err
}

ReadI64

func (p *TBinaryProtocol) ReadI64(ctx context.Context) (value int64, err error) {
	buf := p.buffer[0:8]
	// 读取8字节
	err = p.readAll(ctx, buf)
	// 从字节数组读取int64
	value = int64(binary.BigEndian.Uint64(buf))
	return value, err
}

ReadDouble

func (p *TBinaryProtocol) ReadDouble(ctx context.Context) (value float64, err error) {
	buf := p.buffer[0:8]
	// 读取8字节
	err = p.readAll(ctx, buf)
	// 字节数组转uint64  uint64转float64
	value = math.Float64frombits(binary.BigEndian.Uint64(buf))
	return value, err
}

ReadString

func (p *TBinaryProtocol) ReadString(ctx context.Context) (value string, err error) {
	// 读取32位 也就是4字节
	// size就是字符串的长度
	size, e := p.ReadI32(ctx)
	if e != nil {
		return "", e
	}
	err = checkSizeForProtocol(size, p.cfg)
	if err != nil {
		return
	}
	if size == 0 {
		return "", nil
	}
	// 大小比64字节小 直接读取
	if size < int32(len(p.buffer)) {
		// Avoid allocation on small reads
		buf := p.buffer[:size]
		read, e := io.ReadFull(p.trans, buf)
		return string(buf[:read]), NewTProtocolException(e)
	}

	// 根据长度读取字符串
	return p.readStringBody(size)
}

readStringBody

func (p *TBinaryProtocol) readStringBody(size int32) (value string, err error) {
	buf, err := safeReadBytes(size, p.trans)
	return string(buf), NewTProtocolException(err)
}
// 它尝试从 trans 读取 size 字节,以防止在 size 非常大(主要是由格式错误的消息引起)时进行大量分配。
// It tries to read size bytes from trans, in a way that prevents large
// allocations when size is insanely large (mostly caused by malformed message).
func safeReadBytes(size int32, trans io.Reader) ([]byte, error) {
	if size < 0 {
		return nil, nil
	}

	buf := new(bytes.Buffer)
	// 读取size字节
	_, err := io.CopyN(buf, trans, int64(size))
	return buf.Bytes(), err
}

TJSONProtocol

// for references to _ParseContext see tsimplejson_protocol.go

// thrift JSON 协议实现。 使用简单的 JSON 协议
// 
// JSON protocol implementation for thrift.
// Utilizes Simple JSON protocol
//
type TJSONProtocol struct {
	// 组合(继承)TSimpleJSONProtocol的功能
	*TSimpleJSONProtocol
}
// 简单 JSON 协议实现
//
// 该协议生成/使用适合脚本语言解析的简单输出格式。它不应与功能齐全的 TJSONProtocol 混淆。
//
// Simple JSON protocol implementation for thrift.
//
// This protocol produces/consumes a simple output format
// suitable for parsing by scripting languages.  It should not be
// confused with the full-featured TJSONProtocol.
//
type TSimpleJSONProtocol struct {
	trans TTransport  // 传输

	cfg *TConfiguration // 配置

	parseContextStack jsonContextStack // 解析上下文栈  用[]int实现
	dumpContext       jsonContextStack // 导出上线文  用[]int实现

	writer *bufio.Writer  //
	reader *bufio.Reader  // 
}
// package thrift  // simple_json_protocol.go 

var (
	JSON_COMMA                   []byte
	JSON_COLON                   []byte
	JSON_LBRACE                  []byte
	JSON_RBRACE                  []byte
	JSON_LBRACKET                []byte
	JSON_RBRACKET                []byte
	JSON_QUOTE                   byte
	JSON_QUOTE_BYTES             []byte
	JSON_NULL                    []byte
	JSON_TRUE                    []byte
	JSON_FALSE                   []byte
	JSON_INFINITY                string
	JSON_NEGATIVE_INFINITY       string
	JSON_NAN                     string
	JSON_INFINITY_BYTES          []byte
	JSON_NEGATIVE_INFINITY_BYTES []byte
	JSON_NAN_BYTES               []byte
)

func init() {
	JSON_COMMA = []byte{','}
	JSON_COLON = []byte{':'}
	JSON_LBRACE = []byte{'{'}
	JSON_RBRACE = []byte{'}'}
	JSON_LBRACKET = []byte{'['}
	JSON_RBRACKET = []byte{']'}
	JSON_QUOTE = '"'
	JSON_QUOTE_BYTES = []byte{'"'}
	JSON_NULL = []byte{'n', 'u', 'l', 'l'}
	JSON_TRUE = []byte{'t', 'r', 'u', 'e'}
	JSON_FALSE = []byte{'f', 'a', 'l', 's', 'e'}
	JSON_INFINITY = "Infinity"
	JSON_NEGATIVE_INFINITY = "-Infinity"
	JSON_NAN = "NaN"
	JSON_INFINITY_BYTES = []byte{'I', 'n', 'f', 'i', 'n', 'i', 't', 'y'}
	JSON_NEGATIVE_INFINITY_BYTES = []byte{'-', 'I', 'n', 'f', 'i', 'n', 'i', 't', 'y'}
	JSON_NAN_BYTES = []byte{'N', 'a', 'N'}
}

WriteMessageBegin

func (p *TJSONProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqId int32) error {
	// 重置json上下文栈  这个栈是用int[]实现的 
	p.resetContextStack() // THRIFT-3735
	// 写入list开始 [
	if e := p.OutputListBegin(); e != nil {
		return e
	}
	// 写入thrift json 协议版本  4字节
	if e := p.WriteI32(ctx, THRIFT_JSON_PROTOCOL_VERSION); e != nil {
		return e
	}
	// 写入方法名 
	if e := p.WriteString(ctx, name); e != nil {
		return e
	}
	// 写入thrift类型
	if e := p.WriteByte(ctx, int8(typeId)); e != nil {
		return e
	}
	// 写入序列Id
	if e := p.WriteI32(ctx, seqId); e != nil {
		return e
	}
	return nil
}
func (p *TSimpleJSONProtocol) OutputListBegin() error {
	// 
	if e := p.OutputPreValue(); e != nil {
		return e
	}
	if _, e := p.write(JSON_LBRACKET); e != nil {
		return NewTProtocolException(e)
	}
	p.dumpContext.push(_CONTEXT_IN_LIST_FIRST)
	return nil
}

WriteMessageEnd

func (p *TJSONProtocol) WriteMessageEnd(ctx context.Context) error {
	// 
	return p.OutputListEnd()
}
// 
func (p *TSimpleJSONProtocol) OutputListEnd() error {
	// 写入]
	if _, e := p.write(JSON_RBRACKET); e != nil {
		return NewTProtocolException(e)
	}
	// 出栈
	_, ok := p.dumpContext.pop()
	if !ok {
		return errEmptyJSONContextStack
	}
	// 需要debug
	if e := p.OutputPostValue(); e != nil {
		return e
	}
	return nil
}
func (p *TSimpleJSONProtocol) OutputPostValue() error {
	cxt, ok := p.dumpContext.peek()
	if !ok {
		return errEmptyJSONContextStack
	}
	switch cxt {
	case _CONTEXT_IN_LIST_FIRST:
		p.dumpContext.pop()
		p.dumpContext.push(_CONTEXT_IN_LIST)
	case _CONTEXT_IN_OBJECT_FIRST:
		p.dumpContext.pop()
		p.dumpContext.push(_CONTEXT_IN_OBJECT_NEXT_VALUE)
	case _CONTEXT_IN_OBJECT_NEXT_KEY:
		p.dumpContext.pop()
		p.dumpContext.push(_CONTEXT_IN_OBJECT_NEXT_VALUE)
	case _CONTEXT_IN_OBJECT_NEXT_VALUE:
		p.dumpContext.pop()
		p.dumpContext.push(_CONTEXT_IN_OBJECT_NEXT_KEY)
	}
	return nil
}

thrift Java源码解析

Java版thrift 版本 0.16.0
源码链接: https://github.com/apache/thrift/tree/0.16.0/lib/java

TSocket transport = new TSocket("localhost", 9090);
transport.open();

TBinaryProtocol protocol = new TBinaryProtocol(transport);

TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
Calculator.Client service = new Calculator.Client(mp);

TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
WeatherReport.Client service2 = new WeatherReport.Client(mp2);

System.out.println(service.add(2,2));
System.out.println(service2.getTemperature());

() 思考

协议支持多版本

RPC协议设计中,协议可能升级,我们需要考虑协议及数据格式的多版本问题。

比如 thrift v0.10.0 和 thrift v0.16.0 里的TBinaryProtocol可以

同一thrift版本协议支持多版本idl吗?

v1版本idl 老代码

v2版本idl 新增1个字段

版本管理

idl新增字段,服务端上线后,调用端上线中,会存在v1版本的idl调用服务端,还会存v2版本的idl调用服务端。

idl新增字段,服务端上线中会存在2个版本的idl,没有新增字段的v1版本idl和有新增字段的v2版本的idl,这个时候客户端还没有上线。

向前兼容

新程序就需要同时能够解析 v1 和 v2 版本的数据。

向后兼容

老程序仍然可以读新的数据格式,也就是有向后兼容的能力。

thrift是怎么解决的

Thrift 里的 TBinaryProtocol 的实现方式也很简单,那就是顺序写入数据的过程中,不仅会写入数据的值(field-value),还会写入数据的编号(field-id)和类型(field-type);读取的时候也一样。并且,在每一条记录的结束都会写下一个标志位。

在读取数据的时候,老版本的 v1 代码,看到自己没有见过的编号(field-id 对应idl里的id)就可以跳过。
新版本的 v2 代码,对于老数据里没有的字段,也就是读不到值而已,并不会出现不兼容的情况。

顺序排列的编号,就起到了版本的作用,而我们不需要再专门去进行数据版本的管理了。

写数据逻辑

writeField1

func (p *CalculatorAddArgs) writeField1(ctx context.Context, oprot thrift.TProtocol) (err error) {
  // 写字段名
  if err := oprot.WriteFieldBegin(ctx, "num1", thrift.I32, 1); err != nil {
	return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:num1: ", p), err) }
  // 写字段值	
  if err := oprot.WriteI32(ctx, int32(p.Num1)); err != nil {
  return thrift.PrependError(fmt.Sprintf("%T.num1 (1) field write error: ", p), err) }
  // 写字段结束标记
  if err := oprot.WriteFieldEnd(ctx); err != nil {
    return thrift.PrependError(fmt.Sprintf("%T write field end error 1:num1: ", p), err) }
  return err
}

WriteFieldBegin

// param name 字段名
// param typeId 字段类型
// param id  字段对应idl里的id
func (p *TBinaryProtocol) WriteFieldBegin(ctx context.Context, name string, typeId TType, id int16) error {
	// 写字段类型 8位 也就是1字节
	e := p.WriteByte(ctx, int8(typeId))
	if e != nil {
		return e
	}
	// 写入idl里的id  16位 2字节
	e = p.WriteI16(ctx, id)
	return e
}

WriteFieldEnd

func (p *TBinaryProtocol) WriteFieldEnd(ctx context.Context) error {
	return nil
}

ReadFieldBegin

// 
func (p *TBinaryProtocol) ReadFieldBegin(ctx context.Context) (name string, typeId TType, seqId int16, err error) {
	// 获取对应的thrift字段类型  读取前8位 也就是1字节  
	t, err := p.ReadByte(ctx)
	typeId = TType(t)
	if err != nil {
		return name, typeId, seqId, err
	}
	if t != STOP {
		// 获取idl里设置的id   16位 2字节  
		seqId, err = p.ReadI16(ctx)
	}
	// 返回的name是空的 
	return name, typeId, seqId, err
}

ReadFieldEnd

func (p *TBinaryProtocol) ReadFieldEnd(ctx context.Context) error {
	return nil
}

idl里可以删除字段吗?

如果字段调用方还在使用,不建议删除

如果字段调用方已经不使用了,可以删除字段,但别删除idl里对应的id

参考资料

[1] thrift
[2] 协议和编解码
[3] rpc之thrift入门与TBinaryProtocol源码追踪
[4] THRIFT-TPROTOCOL
[5] thrift简单示例 (go语言)
[6] 11 | 通过Thrift序列化:我们要预知未来才能向后兼容吗?