Go kit
Go kit 是一个微服务工具库。
stringsvc1
我们从 stringsvc1 样例工程读起:
package main
import (
"context"
"encoding/json"
"errors"
"log"
"net/http"
"strings"
"github.com/go-kit/kit/endpoint"
httptransport "github.com/go-kit/kit/transport/http"
)
/*
* =========== 服务接口与服务实现 ===========
*/
// StringService provides operations on strings.
type StringService interface {
Uppercase(string) (string, error)
Count(string) int
}
// stringService is a concrete implementation of StringService
type stringService struct{}
func (stringService) Uppercase(s string) (string, error) {
if s == "" {
return "", ErrEmpty
}
return strings.ToUpper(s), nil
}
func (stringService) Count(s string) int {
return len(s)
}
/*
* =========== 服务请求与响应的格式 ===========
*/
// ErrEmpty is returned when an input string is empty.
var ErrEmpty = errors.New("empty string")
// For each method, we define request and response structs
type uppercaseRequest struct {
S string `json:"s"`
}
type uppercaseResponse struct {
V string `json:"v"`
Err string `json:"err,omitempty"` // errors don't define JSON marshaling
}
type countRequest struct {
S string `json:"s"`
}
type countResponse struct {
V int `json:"v"`
}
/*
* =========== 端点 ===========
* 端点表示的是单个 RPC 方法
*/
// Endpoints are a primary abstraction in go-kit. An endpoint represents a single RPC (method in our service interface)
func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
req := request.(uppercaseRequest)
v, err := svc.Uppercase(req.S)
if err != nil {
return uppercaseResponse{v, err.Error()}, nil
}
return uppercaseResponse{v, ""}, nil
}
}
func makeCountEndpoint(svc StringService) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
req := request.(countRequest)
v := svc.Count(req.S)
return countResponse{v}, nil
}
}
/*
* =========== 主程序 ===========
* 将服务暴露到网络上
* transport:传输协议
*/
// Transports expose the service to the network. In this first example we utilize JSON over HTTP.
func main() {
svc := stringService{}
uppercaseHandler := httptransport.NewServer(
makeUppercaseEndpoint(svc),
decodeUppercaseRequest,
encodeResponse,
)
countHandler := httptransport.NewServer(
makeCountEndpoint(svc),
decodeCountRequest,
encodeResponse,
)
http.Handle("/uppercase", uppercaseHandler)
http.Handle("/count", countHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
/*
* =========== 编码译码 ===========
* 在有差异的数据体与无差异的 JSON 文本之间转换
*/
func decodeUppercaseRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request uppercaseRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func decodeCountRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request countRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
stringsvc2
examples/stringsvc2 at master · go-kit/examples (github.com)
将项目简单分层,然后引入和日志和监控中间件。值得一提的是他的中间件的实现方式。它通过接口约定,使得定义了一个接口服务对象之后,让中间件实现相同的接口,并将实现后的实例包裹之前的实例,这样调用是从最外层中间件开始,利用函数栈 next. 服务名 调用内层的实例,然后再从内层出来。
// 创建服务
var svc StringService
svc = stringService{}
// 创建一个日志中间件,附加到服务上
svc = loggingMiddleware{logger, svc}
// 再把监控中间件附加到服务上
svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}
项目分成了五个文件:
service.go
type StringService
type stringService
func Uppercase
func Count
var ErrEmpty
transport.go
func makeUppercaseEndpoint
func makeCountEndpoint
func decodeUppercaseRequest
func decodeCountRequest
func encodeResponse
type uppercaseRequest
type uppercaseResponse
type countRequest
type countResponse
logging.go:实现了一个日志中间件
package main
import (
"time"
"github.com/go-kit/kit/log"
)
type loggingMiddleware struct {
logger log.Logger
next StringService
}
func (mw loggingMiddleware) Uppercase(s string) (output string, err error) {
defer func(begin time.Time) {
_ = mw.logger.Log(
"method", "uppercase",
"input", s,
"output", output,
"err", err,
"took", time.Since(begin),
)
}(time.Now())
output, err = mw.next.Uppercase(s)
return
}
func (mw loggingMiddleware) Count(s string) (n int) {
defer func(begin time.Time) {
_ = mw.logger.Log(
"method", "count",
"input", s,
"n", n,
"took", time.Since(begin),
)
}(time.Now())
n = mw.next.Count(s)
return
}
instrumenting.go
实现了监控中间件
main.go
package main
import (
"net/http"
"os"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/go-kit/kit/log"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
httptransport "github.com/go-kit/kit/transport/http"
)
func main() {
// 使用 STD 流的日志器
logger := log.NewLogfmtLogger(os.Stderr)
fieldKeys := []string{"method", "error"}
requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "my_group",
Subsystem: "string_service",
Name: "request_count",
Help: "Number of requests received.",
}, fieldKeys)
requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "my_group",
Subsystem: "string_service",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, fieldKeys)
countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "my_group",
Subsystem: "string_service",
Name: "count_result",
Help: "The result of each count method.",
}, []string{}) // no fields here
// 创建服务
var svc StringService
svc = stringService{}
// 创建一个日志中间件,附加到服务上
svc = loggingMiddleware{logger, svc}
// 再把监控中间件附加到服务上
svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}
uppercaseHandler := httptransport.NewServer(
makeUppercaseEndpoint(svc),
decodeUppercaseRequest,
encodeResponse,
)
countHandler := httptransport.NewServer(
makeCountEndpoint(svc),
decodeCountRequest,
encodeResponse,
)
http.Handle("/uppercase", uppercaseHandler)
http.Handle("/count", countHandler)
http.Handle("/metrics", promhttp.Handler())
logger.Log("msg", "HTTP", "addr", ":8080")
logger.Log("err", http.ListenAndServe(":8080", nil))
stringsvc3
下面代码多起来了,建议克隆仓库后自己在 IDE 里看
stringsvc3 加入了服务间通信。假定我们得调用另一个服务才能完成 Uppercase 的功能。于是有了代理中间件。
我们先分析调用流程。
main.go
func main() {
var (
// ...
// 增加了一个 proxy cli 参数
proxy = flag.String("proxy", "", "Optional comma-separated list of URLs to proxy uppercase requests")
)
flag.Parse()
var logger log.Logger
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "listen", *listen, "caller", log.DefaultCaller)
//... 日志和监控等。代码没啥变化
logger.Log("err", http.ListenAndServe(*listen, nil))
}
/uppercase 请求被 uppercaseHandler
处理,后者 调用 makeUppercaseEndpoint:
func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(uppercaseRequest)
v, err := svc.Uppercase(req.S)
if err != nil {
return uppercaseResponse{v, err.Error()}, nil
}
return uppercaseResponse{v, ""}, nil
}
}
这里调用 svc.Uppercase
,是一个接口方法。其实现位于何处?分析 main.go:
var svc StringService
svc = stringService{}
svc = proxyingMiddleware(context.Background(), *proxy, logger)(svc)
svc = loggingMiddleware(logger)(svc)
svc = instrumentingMiddleware(requestCount, requestLatency, countResult)(svc)
可以看到 proxyingMiddleware
是其实现。上面 svc = proxyingMiddleware (context.Background (), *proxy, logger)(svc)
传入了字符串 *proxy
,在 proxyingMiddleware 中作为 instances
变量值。将其用逗号分割成数组,对每个元素执行 makeUppercaseProxy
:
for _, instance := range instanceList {
var e endpoint.Endpoint
e = makeUppercaseProxy(ctx, instance)
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e)
endpointer = append(endpointer, e)
}
实际上就是生成 http endpoint:
func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint {
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
u, err := url.Parse(instance)
if err != nil {
panic(err)
}
if u.Path == "" {
u.Path = "/uppercase"
}
return httptransport.NewClient(
"GET",
u,
encodeRequest,
decodeUppercaseResponse,
).Endpoint()
}
在之前的 for 循环中还有两个语句:
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e)
在讲这个之前,我们讲一下服务发现。Go kit 提供了发布订阅模式的服务发现
type Subscriber interface {
Endpoints() ([]endpoint.Endpoint, error)
}
circuit breakers 是断路器, rate limiters 是限流器。所以这里是给每个下级服务增加了熔断、限流机制,然后添加到订阅列表。
最后用这些订阅构建了一个单端点,支持重试和负载均衡,以一个 StringService 的形式返回:
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(maxAttempts, maxTime, balancer)
return func(next StringService) StringService {
return proxymw{next, retry}
}
stringsvc 总结
虽然是多个微服务,其实还是写在了一个项目里,通过 proxy 参数的有无决定方法是自己执行还是让下一级服务执行。
addsvc
下面我们分析 addsvc 例,看一下分布式追踪、客户端包的实现。
examples/addsvc at master · go-kit/examples (github.com)
addsvc 是一个微服务的例子,它充分利用了 Go 工具包的大部分功能,包括服务和传输级中间件,同时采用多个通信协议,分布式跟踪和丰富的错误定义。服务器二进制文件可在 cmd/addsvc 中找到。客户端二进制文件可在 cmd/addcli 中找到。
最后,addtransport 包为每个支持的通信协议提供服务器和客户端。客户端结构包含了某些中间件,以展示客户端库的模式。但请注意:客户端库通常是个坏主意,因为它们很容易导致 “分布式整体” 的反模式(即错误的设计模式)。如果你不知道是否有必要使用,那么最好不用:最好把逻辑转移到消费者身上,并依靠契约测试来检测不兼容的地方。
契约测试
我们在测试某个微服务的时候,往往要调用其它微服务,为了在出错时明确是自己的代码问题,而不是因为其它服务的问题,常常使用替身测试(就是前后端开发常用的 Mock),即构建一个虚拟的外部环境。但是这样如果真实的外部环境变了,我们的服务未必能知道。
契约测试说白了就是引入一个契约,微服务和它调用的其它微服务共同遵守这个契约(面向接口,只不过这个接口变得更高级了,因为契约感知变化(契约的违反))
Thrift
Thrift 是 Apache 的一个开源项目,是一个 RPC 框架。
Protobuf
Google 开发的序列化协议。