Go kit

Go kit 是一个微服务工具库。

官网 文档 Github

stringsvc1

我们从 stringsvc1 样例工程读起:

  
package main

import (
	"context"
	"encoding/json"
	"errors"
	"log"
	"net/http"
	"strings"

	"github.com/go-kit/kit/endpoint"
	httptransport "github.com/go-kit/kit/transport/http"
)
/*
 * =========== 服务接口与服务实现 ===========
 */
// StringService provides operations on strings.
type StringService interface {
	Uppercase(string) (string, error)
	Count(string) int
}

// stringService is a concrete implementation of StringService
type stringService struct{}

func (stringService) Uppercase(s string) (string, error) {
	if s == "" {
		return "", ErrEmpty
	}
	return strings.ToUpper(s), nil
}

func (stringService) Count(s string) int {
	return len(s)
}

/*
 * =========== 服务请求与响应的格式 ===========
 */

// ErrEmpty is returned when an input string is empty.
var ErrEmpty = errors.New("empty string")

// For each method, we define request and response structs
type uppercaseRequest struct {
	S string `json:"s"`
}

type uppercaseResponse struct {
	V   string `json:"v"`
	Err string `json:"err,omitempty"` // errors don't define JSON marshaling
}

type countRequest struct {
	S string `json:"s"`
}

type countResponse struct {
	V int `json:"v"`
}
/*
 * =========== 端点 ===========
 * 端点表示的是单个 RPC 方法
 */
// Endpoints are a primary abstraction in go-kit. An endpoint represents a single RPC (method in our service interface)
func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(uppercaseRequest)
		v, err := svc.Uppercase(req.S)
		if err != nil {
			return uppercaseResponse{v, err.Error()}, nil
		}
		return uppercaseResponse{v, ""}, nil
	}
}

func makeCountEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(countRequest)
		v := svc.Count(req.S)
		return countResponse{v}, nil
	}
}



/*
 * =========== 主程序 ===========
 * 将服务暴露到网络上
 * transport:传输协议
 */

// Transports expose the service to the network. In this first example we utilize JSON over HTTP.
func main() {
	svc := stringService{}

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)

	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)
	log.Fatal(http.ListenAndServe(":8080", nil))
}

/*
 * =========== 编码译码 ===========
 * 在有差异的数据体与无差异的 JSON 文本之间转换
 */
func decodeUppercaseRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var request uppercaseRequest
	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		return nil, err
	}
	return request, nil
}

func decodeCountRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var request countRequest
	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		return nil, err
	}
	return request, nil
}

func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	return json.NewEncoder(w).Encode(response)
}

stringsvc2

examples/stringsvc2 at master · go-kit/examples (github.com)

将项目简单分层,然后引入和日志和监控中间件。值得一提的是他的中间件的实现方式。它通过接口约定,使得定义了一个接口服务对象之后,让中间件实现相同的接口,并将实现后的实例包裹之前的实例,这样调用是从最外层中间件开始,利用函数栈 next. 服务名 调用内层的实例,然后再从内层出来。

    // 创建服务
	var svc StringService
	svc = stringService{}
    // 创建一个日志中间件,附加到服务上
	svc = loggingMiddleware{logger, svc}
    // 再把监控中间件附加到服务上
	svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}

项目分成了五个文件:

service.go

type StringService
type stringService
func Uppercase
func Count
var ErrEmpty

transport.go

func makeUppercaseEndpoint
func makeCountEndpoint
func decodeUppercaseRequest
func decodeCountRequest
func encodeResponse
type uppercaseRequest
type uppercaseResponse
type countRequest
type countResponse

logging.go:实现了一个日志中间件

  
package main

import (
	"time"

	"github.com/go-kit/kit/log"
)

type loggingMiddleware struct {
	logger log.Logger
	next   StringService
}

func (mw loggingMiddleware) Uppercase(s string) (output string, err error) {
	defer func(begin time.Time) {
		_ = mw.logger.Log(
			"method", "uppercase",
			"input", s,
			"output", output,
			"err", err,
			"took", time.Since(begin),
		)
	}(time.Now())

	output, err = mw.next.Uppercase(s)
	return
}

func (mw loggingMiddleware) Count(s string) (n int) {
	defer func(begin time.Time) {
		_ = mw.logger.Log(
			"method", "count",
			"input", s,
			"n", n,
			"took", time.Since(begin),
		)
	}(time.Now())

	n = mw.next.Count(s)
	return
}

instrumenting.go

实现了监控中间件

main.go

package main

import (
	"net/http"
	"os"

	stdprometheus "github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"

	"github.com/go-kit/kit/log"
	kitprometheus "github.com/go-kit/kit/metrics/prometheus"
	httptransport "github.com/go-kit/kit/transport/http"
)

func main() {
    // 使用 STD 流的日志器
	logger := log.NewLogfmtLogger(os.Stderr)

	fieldKeys := []string{"method", "error"}
	requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "request_count",
		Help:      "Number of requests received.",
	}, fieldKeys)
	requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "request_latency_microseconds",
		Help:      "Total duration of requests in microseconds.",
	}, fieldKeys)
	countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "count_result",
		Help:      "The result of each count method.",
	}, []string{}) // no fields here
	
    // 创建服务
	var svc StringService
	svc = stringService{}
    // 创建一个日志中间件,附加到服务上
	svc = loggingMiddleware{logger, svc}
    // 再把监控中间件附加到服务上
	svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)

	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)
	http.Handle("/metrics", promhttp.Handler())
	logger.Log("msg", "HTTP", "addr", ":8080")
	logger.Log("err", http.ListenAndServe(":8080", nil))

stringsvc3

下面代码多起来了,建议克隆仓库后自己在 IDE 里看

stringsvc3 加入了服务间通信。假定我们得调用另一个服务才能完成 Uppercase 的功能。于是有了代理中间件。

我们先分析调用流程。

main.go


func main() {
	var (
         // ...
         // 增加了一个 proxy cli 参数
		proxy  = flag.String("proxy", "", "Optional comma-separated list of URLs to proxy uppercase requests")
	)
	flag.Parse()

	var logger log.Logger
	logger = log.NewLogfmtLogger(os.Stderr)
	logger = log.With(logger, "listen", *listen, "caller", log.DefaultCaller)
 
	//... 日志和监控等。代码没啥变化
	logger.Log("err", http.ListenAndServe(*listen, nil))
}

/uppercase 请求被 uppercaseHandler 处理,后者 调用 makeUppercaseEndpoint:

func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		req := request.(uppercaseRequest)
		v, err := svc.Uppercase(req.S)
		if err != nil {
			return uppercaseResponse{v, err.Error()}, nil
		}
		return uppercaseResponse{v, ""}, nil
	}
}

这里调用 svc.Uppercase,是一个接口方法。其实现位于何处?分析 main.go

	var svc StringService
	svc = stringService{}
	svc = proxyingMiddleware(context.Background(), *proxy, logger)(svc)
	svc = loggingMiddleware(logger)(svc)
	svc = instrumentingMiddleware(requestCount, requestLatency, countResult)(svc)

可以看到 proxyingMiddleware 是其实现。上面 svc = proxyingMiddleware (context.Background (), *proxy, logger)(svc) 传入了字符串 *proxy,在 proxyingMiddleware 中作为 instances 变量值。将其用逗号分割成数组,对每个元素执行 makeUppercaseProxy

	for _, instance := range instanceList {
		var e endpoint.Endpoint
		e = makeUppercaseProxy(ctx, instance)
		e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
		e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e)
		endpointer = append(endpointer, e)
	}

实际上就是生成 http endpoint:



func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint {
	if !strings.HasPrefix(instance, "http") {
		instance = "http://" + instance
	}
	u, err := url.Parse(instance)
	if err != nil {
		panic(err)
	}
	if u.Path == "" {
		u.Path = "/uppercase"
	}
	return httptransport.NewClient(
		"GET",
		u,
		encodeRequest,
		decodeUppercaseResponse,
	).Endpoint()
}

在之前的 for 循环中还有两个语句:

e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e)

在讲这个之前,我们讲一下服务发现。Go kit 提供了发布订阅模式的服务发现

type Subscriber interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

circuit breakers 是断路器, rate limiters 是限流器。所以这里是给每个下级服务增加了熔断、限流机制,然后添加到订阅列表。

最后用这些订阅构建了一个单端点,支持重试和负载均衡,以一个 StringService 的形式返回:

	balancer := lb.NewRoundRobin(subscriber)
	retry := lb.Retry(maxAttempts, maxTime, balancer)

	return func(next StringService) StringService {
		return proxymw{next, retry}
	}

stringsvc 总结

虽然是多个微服务,其实还是写在了一个项目里,通过 proxy 参数的有无决定方法是自己执行还是让下一级服务执行。

addsvc

下面我们分析 addsvc 例,看一下分布式追踪、客户端包的实现。

examples/addsvc at master · go-kit/examples (github.com)

addsvc 是一个微服务的例子,它充分利用了 Go 工具包的大部分功能,包括服务和传输级中间件,同时采用多个通信协议,分布式跟踪和丰富的错误定义。服务器二进制文件可在 cmd/addsvc 中找到。客户端二进制文件可在 cmd/addcli 中找到。

最后,addtransport 包为每个支持的通信协议提供服务器和客户端。客户端结构包含了某些中间件,以展示客户端库的模式。但请注意:客户端库通常是个坏主意,因为它们很容易导致 “分布式整体” 的反模式(即错误的设计模式)。如果你不知道是否有必要使用,那么最好不用:最好把逻辑转移到消费者身上,并依靠契约测试来检测不兼容的地方。

契约测试

我们在测试某个微服务的时候,往往要调用其它微服务,为了在出错时明确是自己的代码问题,而不是因为其它服务的问题,常常使用替身测试(就是前后端开发常用的 Mock),即构建一个虚拟的外部环境。但是这样如果真实的外部环境变了,我们的服务未必能知道。

契约测试说白了就是引入一个契约,微服务和它调用的其它微服务共同遵守这个契约(面向接口,只不过这个接口变得更高级了,因为契约感知变化(契约的违反))

聊一聊契约测试 - Thoughtworks 洞见

Thrift

Thrift 是 Apache 的一个开源项目,是一个 RPC 框架。

Apache Thrift - Home

Protobuf

Google 开发的序列化协议。