diff --git a/example/client_test.go b/example/client_test.go new file mode 100644 index 0000000..ac35169 --- /dev/null +++ b/example/client_test.go @@ -0,0 +1,22 @@ +package main + +import ( + "context" + "fmt" + "git.diulo.com/mogfee/kit/transport/http" + "testing" +) + +func TestCall(t *testing.T) { + var out UserAddRequest + path := "/api/v1/answer/listCategory" + client := http.Client{} + ctx := context.Background() + in := UserAddRequest{Name: "tom"} + + err := client.Invoke(ctx, "POST", path, in, &out) + if err != nil { + t.Error(err) + } + fmt.Println(out) +} diff --git a/selector/balancer.go b/selector/balancer.go index e1173fb..8266aae 100644 --- a/selector/balancer.go +++ b/selector/balancer.go @@ -6,10 +6,10 @@ import ( ) type Balancer interface { - Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, di DoneFunc, err error) + Pick(ctx context.Context, nodes []WeightedNode) (WeightedNode, DoneFunc, error) } type BalancerBuilder interface { - Build() Balancer + Builder() Balancer } type WeightedNode interface { Node @@ -18,7 +18,6 @@ type WeightedNode interface { Pick() DoneFunc PickElapsed() time.Duration } - type WeightedNodeBuilder interface { Build(Node) WeightedNode } diff --git a/selector/default_node.go b/selector/default_node.go index 3274778..47602df 100644 --- a/selector/default_node.go +++ b/selector/default_node.go @@ -22,7 +22,7 @@ func (d *DefaultNode) Address() string { return d.address } -func (d *DefaultNode) ServerName() string { +func (d *DefaultNode) ServiceName() string { return d.name } @@ -39,7 +39,7 @@ func (d *DefaultNode) Metadata() map[string]string { } func NewNode(scheme string, address string, ins *registry.ServiceInstance) Node { - node := DefaultNode{ + node := &DefaultNode{ scheme: scheme, address: address, } @@ -53,5 +53,5 @@ func NewNode(scheme string, address string, ins *registry.ServiceInstance) Node } } } - return &node + return node } diff --git a/selector/default_selector.go b/selector/default_selector.go index 1b15760..68dfb9d 100644 --- a/selector/default_selector.go +++ b/selector/default_selector.go @@ -5,53 +5,62 @@ import ( "sync/atomic" ) -type DefaultSelector struct { - nodes atomic.Value - Balancer Balancer +var ( + _ Rebalancer = (*Default)(nil) + _ Builder = (*DefaultBuilder)(nil) +) + +type Default struct { NodeBuilder WeightedNodeBuilder -} + Balancer Balancer -func (d *DefaultSelector) Apply(nodes []Node) { - newNodes := make([]WeightedNode, 0, len(nodes)) - for _, v := range nodes { - newNodes = append(newNodes, d.NodeBuilder.Build(v)) - } - d.nodes.Store(newNodes) + nodes atomic.Value } -func (d *DefaultSelector) Select(ctx context.Context, opts ...SelectOption) (selected WeightedNode, di DoneFunc, err error) { - var options SelectOptions - for _, o := range opts { - o(&options) - } +func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error) { + var ( + options SelectOptions + candidates []WeightedNode + ) nodes, ok := d.nodes.Load().([]WeightedNode) if !ok { return nil, nil, ErrNoAvailable } - conditions := make([]WeightedNode, 0) + for _, o := range opts { + o(&options) + } if len(options.NodeFilters) > 0 { - newNodes := make([]Node, 0) - for _, v := range nodes { - newNodes = append(newNodes, v) + newNodes := make([]Node, len(nodes)) + for i, wc := range nodes { + newNodes[i] = wc } for _, filter := range options.NodeFilters { newNodes = filter(ctx, newNodes) } - conditions = make([]WeightedNode, 0, len(newNodes)) - for _, v := range newNodes { - conditions = append(conditions, v.(WeightedNode)) + candidates = make([]WeightedNode, len(newNodes)) + for i, n := range newNodes { + candidates[i] = n.(WeightedNode) } } else { - conditions = nodes + candidates = nodes } - pick, doneFunc, err := d.Balancer.Pick(ctx, conditions) + wn, done, err := d.Balancer.Pick(ctx, candidates) if err != nil { return nil, nil, err } - if t, ok := FromPeerContext(ctx); ok { - t.Node = pick + p, ok := FromPeerContext(ctx) + if ok { + p.Node = wn.Raw() + } + return wn.Raw(), done, nil +} + +func (d *Default) Apply(nodes []Node) { + weightedNodes := make([]WeightedNode, 0, len(nodes)) + for _, v := range nodes { + weightedNodes = append(weightedNodes, d.NodeBuilder.Build(v)) } - return pick, doneFunc, nil + d.nodes.Store(weightedNodes) } type DefaultBuilder struct { @@ -60,8 +69,8 @@ type DefaultBuilder struct { } func (d *DefaultBuilder) Build() Selector { - return &DefaultSelector{ + return &Default{ NodeBuilder: d.Node, - Balancer: d.Balancer.Build(), + Balancer: d.Balancer.Builder(), } } diff --git a/selector/global.go b/selector/global.go index 69a5f85..bd100a7 100644 --- a/selector/global.go +++ b/selector/global.go @@ -1,9 +1,9 @@ package selector -var globalSelector = &wrapSelector{} -var _ Builder = (*wrapSelector)(nil) +var globalSelector = &warpSelector{} +var _ Builder = (*warpSelector)(nil) -type wrapSelector struct { +type warpSelector struct { Builder } diff --git a/selector/node/node.go b/selector/node/direct/direct.go similarity index 77% rename from selector/node/node.go rename to selector/node/direct/direct.go index 1c982d4..26a00d8 100644 --- a/selector/node/node.go +++ b/selector/node/direct/direct.go @@ -1,9 +1,8 @@ -package node +package direct import ( "context" "git.diulo.com/mogfee/kit/selector" - "google.golang.org/grpc/balancer" "sync/atomic" "time" ) @@ -12,23 +11,12 @@ const ( defaultWeight = 100 ) -var ( - _ selector.WeightedNode = (*Node)(nil) - _ selector.WeightedNodeBuilder = (*Builder)(nil) -) +var () type Node struct { selector.Node - lastPick int64 -} -type Builder struct { -} -func (b *Builder) Build(node selector.Node) selector.WeightedNode { - return &Node{ - Node: node, - lastPick: 0, - } + lastPick int64 } func (n *Node) Raw() selector.Node { @@ -45,9 +33,19 @@ func (n *Node) Weight() float64 { func (n *Node) Pick() selector.DoneFunc { now := time.Now().UnixNano() atomic.StoreInt64(&n.lastPick, now) - return func(ctx context.Context, di balancer.DoneInfo) {} + return func(ctx context.Context, di selector.DoneInfo) {} } func (n *Node) PickElapsed() time.Duration { return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick)) } + +type Builder struct { +} + +func (b *Builder) Build(node selector.Node) selector.WeightedNode { + return &Node{ + Node: node, + lastPick: 0, + } +} diff --git a/selector/random/random.go b/selector/random/random.go index e2fec99..01d2f29 100644 --- a/selector/random/random.go +++ b/selector/random/random.go @@ -3,45 +3,43 @@ package random import ( "context" "git.diulo.com/mogfee/kit/selector" - "git.diulo.com/mogfee/kit/selector/node" - "sync/atomic" + "math/rand" ) -var _ selector.Balancer = (*Balancer)(nil) +type Option func(o *options) -func New() selector.Selector { - return NewBuilder().Build() +type options struct { } - type Balancer struct { - index atomic.Int64 } -func (r *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) { +func New(opts ...Option) selector.Selector { + return NewBuilder(opts...).Build() +} +func (b *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) { if len(nodes) == 0 { return nil, nil, selector.ErrNoAvailable } - //index := rand.Intn(len(nodes)) - index := r.index.Load() - if int(index) > len(nodes) { - index = 0 - } - r.index.Add(1) - selected := nodes[index] + cut := rand.Intn(len(nodes)) + selected := nodes[cut] d := selected.Pick() return selected, d, nil } -func NewBuilder() selector.Builder { +func NewBuilder(opts ...Option) selector.Builder { + var option options + for _, opt := range opts { + opt(&option) + } return &selector.DefaultBuilder{ - Node: &node.Builder{}, Balancer: &Builder{}, + Node: &direct.Builder{}, } } -type Builder struct { -} +// Builder is random builder +type Builder struct{} -func (b *Builder) Build() selector.Balancer { +func (b *Builder) Builder() selector.Balancer { return &Balancer{} } diff --git a/selector/selector.go b/selector/selector.go index 8915bd5..3ab0c95 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -3,14 +3,13 @@ package selector import ( "context" "git.diulo.com/mogfee/kit/errors" - "google.golang.org/grpc/balancer" ) var ErrNoAvailable = errors.ServiceUnavailable("no_available_node", "") type Selector interface { Rebalancer - Select(ctx context.Context, opts ...SelectOption) (selected WeightedNode, di DoneFunc, err error) + Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error) } type Rebalancer interface { Apply(nodes []Node) @@ -21,19 +20,19 @@ type Builder interface { type Node interface { Scheme() string Address() string - ServerName() string + ServiceName() string InitialWeight() *int64 Version() string Metadata() map[string]string } type DoneInfo struct { - Err error - ReplyMD ReplyMD + Err error + ReplyMD ReplyMD + BytesSent bool BytesReceived bool } type ReplyMD interface { Get(key string) string } - -type DoneFunc func(ctx context.Context, di balancer.DoneInfo) +type DoneFunc func(ctx context.Context, di DoneInfo) diff --git a/transport/http/client.go b/transport/http/client.go index 6a648e7..bdc7c45 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -1,14 +1,18 @@ package http import ( + "bytes" "context" "crypto/tls" + "fmt" "git.diulo.com/mogfee/kit/encoding" "git.diulo.com/mogfee/kit/errors" + "git.diulo.com/mogfee/kit/internal/host" "git.diulo.com/mogfee/kit/internal/httputil" "git.diulo.com/mogfee/kit/middleware" "git.diulo.com/mogfee/kit/registry" "git.diulo.com/mogfee/kit/selector" + "git.diulo.com/mogfee/kit/transport" "io" "net/http" "time" @@ -145,17 +149,144 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) { tr.TLSClientConfig = options.tlsConf } } - //insecure := options.tlsConf - //target, err := parseTarget(options.encoder, insecure) - //if err != nil { - // return nil, err - //} - //selector = selector.GlobalSelector().Build() - //var r *resolver - //if options.discovery != nil { - //} - - return nil, nil + insecure := options.tlsConf == nil + target, err := parseTarget(options.endpoint, insecure) + if err != nil { + return nil, err + } + selector := selector.GlobalSelector().Build() + var r *resolver + if options.discovery != nil { + if target.Scheme == "discovery" { + if r, err = newResolver(ctx, options.discovery, target, selector, options.block, insecure); err != nil { + return nil, fmt.Errorf("[http client] new resolver failed!err: %v", options.endpoint) + } + } else if _, _, err = host.ExtractHostPort(options.endpoint); err != nil { + return nil, fmt.Errorf("[http client] invalid endpoint format: %v", options.endpoint) + } + } + + return &Client{ + opts: options, + targe: target, + r: r, + cc: &http.Client{ + Timeout: options.timeout, + Transport: options.transport, + }, + insecure: insecure, + selector: selector, + }, nil +} +func (client *Client) Invoke(ctx context.Context, method, path string, args any, reply any, opts ...CallOption) error { + var ( + contentType string + body io.Reader + ) + c := defaultCallInfo(path) + for _, o := range opts { + if err := o.before(&c); err != nil { + return err + } + } + if args != nil { + data, err := client.opts.encoder(ctx, c.contentType, args) + if err != nil { + return err + } + contentType = c.contentType + body = bytes.NewReader(data) + } + url := fmt.Sprintf("%s://%s%s", client.targe.Scheme, client.targe.Authority, path) + req, err := http.NewRequest(method, url, body) + if err != nil { + return err + } + if contentType != "" { + req.Header.Set("Content-Type", c.contentType) + } + if client.opts.userAgent != "" { + req.Header.Set("User-Agent", client.opts.userAgent) + } + ctx = transport.NewClientContext(ctx, &Transport{ + endpoint: client.opts.endpoint, + operation: c.operation, + reqHeader: headerCarrier(req.Header), + request: req, + pathTemplate: c.pathTemplate, + }) + return client.invoke(ctx, req, args, reply, c, opts...) +} +func (client *Client) invoke(ctx context.Context, req *http.Request, args any, reply any, c callInfo, opts ...CallOption) error { + h := func(ctx context.Context, in any) (any, error) { + res, err := client.do(req.WithContext(ctx)) + if res != nil { + cs := csAttempt{res: res} + for _, o := range opts { + o.after(&c, &cs) + } + } + if err != nil { + return nil, err + } + defer res.Body.Close() + if err = client.opts.decoder(ctx, res, reply); err != nil { + return nil, err + } + return reply, nil + } + var p selector.Peer + ctx = selector.NewPeerContext(ctx, &p) + if len(client.opts.middleware) > 0 { + h = middleware.Chain(client.opts.middleware...)(h) + } + _, err := h(ctx, args) + return err +} +func (client *Client) Do(req *http.Request, opts ...CallOption) (*http.Response, error) { + c := defaultCallInfo(req.URL.Path) + for _, o := range opts { + if err := o.before(&c); err != nil { + return nil, err + } + } + return client.do(req) +} +func (client *Client) do(req *http.Request) (*http.Response, error) { + var done func(context.Context, selector.DoneInfo) + if client.r != nil { + var ( + err error + node selector.Node + ) + if node, done, err = client.selector.Select(req.Context(), selector.WithNodeFilter(client.opts.nodeFilters...)); err != nil { + return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error()) + } + if client.insecure { + req.URL.Scheme = "http" + } else { + req.URL.Scheme = "https" + } + req.URL.Host = node.Address() + req.Host = node.Address() + } + resp, err := client.cc.Do(req) + if err == nil { + err = client.opts.errorDecoder(req.Context(), resp) + } + if done != nil { + done(req.Context(), selector.DoneInfo{Err: err}) + } + if err != nil { + return nil, err + } + return resp, nil +} +func (client *Client) Close() error { + if client.r != nil { + return client.r.Close() + } + return nil } func DefaultrequestEncoder(ctx context.Context, contentType string, v any) ([]byte, error) { name := httputil.ContentSubtype(contentType)