销售分配后是否付费

master
李伟乐 1 year ago
parent 8c398ecd47
commit 08e2660c5e
  1. 22
      example/client_test.go
  2. 5
      selector/balancer.go
  3. 6
      selector/default_node.go
  4. 67
      selector/default_selector.go
  5. 6
      selector/global.go
  6. 30
      selector/node/direct/direct.go
  7. 38
      selector/random/random.go
  8. 9
      selector/selector.go
  9. 151
      transport/http/client.go

@ -0,0 +1,22 @@
package main
import (
"context"
"fmt"
"git.diulo.com/mogfee/kit/transport/http"
"testing"
)
func TestCall(t *testing.T) {
var out UserAddRequest
path := "/api/v1/answer/listCategory"
client := http.Client{}
ctx := context.Background()
in := UserAddRequest{Name: "tom"}
err := client.Invoke(ctx, "POST", path, in, &out)
if err != nil {
t.Error(err)
}
fmt.Println(out)
}

@ -6,10 +6,10 @@ import (
) )
type Balancer interface { type Balancer interface {
Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, di DoneFunc, err error) Pick(ctx context.Context, nodes []WeightedNode) (WeightedNode, DoneFunc, error)
} }
type BalancerBuilder interface { type BalancerBuilder interface {
Build() Balancer Builder() Balancer
} }
type WeightedNode interface { type WeightedNode interface {
Node Node
@ -18,7 +18,6 @@ type WeightedNode interface {
Pick() DoneFunc Pick() DoneFunc
PickElapsed() time.Duration PickElapsed() time.Duration
} }
type WeightedNodeBuilder interface { type WeightedNodeBuilder interface {
Build(Node) WeightedNode Build(Node) WeightedNode
} }

@ -22,7 +22,7 @@ func (d *DefaultNode) Address() string {
return d.address return d.address
} }
func (d *DefaultNode) ServerName() string { func (d *DefaultNode) ServiceName() string {
return d.name return d.name
} }
@ -39,7 +39,7 @@ func (d *DefaultNode) Metadata() map[string]string {
} }
func NewNode(scheme string, address string, ins *registry.ServiceInstance) Node { func NewNode(scheme string, address string, ins *registry.ServiceInstance) Node {
node := DefaultNode{ node := &DefaultNode{
scheme: scheme, scheme: scheme,
address: address, address: address,
} }
@ -53,5 +53,5 @@ func NewNode(scheme string, address string, ins *registry.ServiceInstance) Node
} }
} }
} }
return &node return node
} }

@ -5,53 +5,62 @@ import (
"sync/atomic" "sync/atomic"
) )
type DefaultSelector struct { var (
nodes atomic.Value _ Rebalancer = (*Default)(nil)
Balancer Balancer _ Builder = (*DefaultBuilder)(nil)
)
type Default struct {
NodeBuilder WeightedNodeBuilder NodeBuilder WeightedNodeBuilder
} Balancer Balancer
func (d *DefaultSelector) Apply(nodes []Node) { nodes atomic.Value
newNodes := make([]WeightedNode, 0, len(nodes))
for _, v := range nodes {
newNodes = append(newNodes, d.NodeBuilder.Build(v))
}
d.nodes.Store(newNodes)
} }
func (d *DefaultSelector) Select(ctx context.Context, opts ...SelectOption) (selected WeightedNode, di DoneFunc, err error) { func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error) {
var options SelectOptions var (
for _, o := range opts { options SelectOptions
o(&options) candidates []WeightedNode
} )
nodes, ok := d.nodes.Load().([]WeightedNode) nodes, ok := d.nodes.Load().([]WeightedNode)
if !ok { if !ok {
return nil, nil, ErrNoAvailable return nil, nil, ErrNoAvailable
} }
conditions := make([]WeightedNode, 0) for _, o := range opts {
o(&options)
}
if len(options.NodeFilters) > 0 { if len(options.NodeFilters) > 0 {
newNodes := make([]Node, 0) newNodes := make([]Node, len(nodes))
for _, v := range nodes { for i, wc := range nodes {
newNodes = append(newNodes, v) newNodes[i] = wc
} }
for _, filter := range options.NodeFilters { for _, filter := range options.NodeFilters {
newNodes = filter(ctx, newNodes) newNodes = filter(ctx, newNodes)
} }
conditions = make([]WeightedNode, 0, len(newNodes)) candidates = make([]WeightedNode, len(newNodes))
for _, v := range newNodes { for i, n := range newNodes {
conditions = append(conditions, v.(WeightedNode)) candidates[i] = n.(WeightedNode)
} }
} else { } else {
conditions = nodes candidates = nodes
} }
pick, doneFunc, err := d.Balancer.Pick(ctx, conditions) wn, done, err := d.Balancer.Pick(ctx, candidates)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if t, ok := FromPeerContext(ctx); ok { p, ok := FromPeerContext(ctx)
t.Node = pick if ok {
p.Node = wn.Raw()
}
return wn.Raw(), done, nil
}
func (d *Default) Apply(nodes []Node) {
weightedNodes := make([]WeightedNode, 0, len(nodes))
for _, v := range nodes {
weightedNodes = append(weightedNodes, d.NodeBuilder.Build(v))
} }
return pick, doneFunc, nil d.nodes.Store(weightedNodes)
} }
type DefaultBuilder struct { type DefaultBuilder struct {
@ -60,8 +69,8 @@ type DefaultBuilder struct {
} }
func (d *DefaultBuilder) Build() Selector { func (d *DefaultBuilder) Build() Selector {
return &DefaultSelector{ return &Default{
NodeBuilder: d.Node, NodeBuilder: d.Node,
Balancer: d.Balancer.Build(), Balancer: d.Balancer.Builder(),
} }
} }

@ -1,9 +1,9 @@
package selector package selector
var globalSelector = &wrapSelector{} var globalSelector = &warpSelector{}
var _ Builder = (*wrapSelector)(nil) var _ Builder = (*warpSelector)(nil)
type wrapSelector struct { type warpSelector struct {
Builder Builder
} }

@ -1,9 +1,8 @@
package node package direct
import ( import (
"context" "context"
"git.diulo.com/mogfee/kit/selector" "git.diulo.com/mogfee/kit/selector"
"google.golang.org/grpc/balancer"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -12,23 +11,12 @@ const (
defaultWeight = 100 defaultWeight = 100
) )
var ( var ()
_ selector.WeightedNode = (*Node)(nil)
_ selector.WeightedNodeBuilder = (*Builder)(nil)
)
type Node struct { type Node struct {
selector.Node selector.Node
lastPick int64
}
type Builder struct {
}
func (b *Builder) Build(node selector.Node) selector.WeightedNode { lastPick int64
return &Node{
Node: node,
lastPick: 0,
}
} }
func (n *Node) Raw() selector.Node { func (n *Node) Raw() selector.Node {
@ -45,9 +33,19 @@ func (n *Node) Weight() float64 {
func (n *Node) Pick() selector.DoneFunc { func (n *Node) Pick() selector.DoneFunc {
now := time.Now().UnixNano() now := time.Now().UnixNano()
atomic.StoreInt64(&n.lastPick, now) atomic.StoreInt64(&n.lastPick, now)
return func(ctx context.Context, di balancer.DoneInfo) {} return func(ctx context.Context, di selector.DoneInfo) {}
} }
func (n *Node) PickElapsed() time.Duration { func (n *Node) PickElapsed() time.Duration {
return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick)) return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick))
} }
type Builder struct {
}
func (b *Builder) Build(node selector.Node) selector.WeightedNode {
return &Node{
Node: node,
lastPick: 0,
}
}

@ -3,45 +3,43 @@ package random
import ( import (
"context" "context"
"git.diulo.com/mogfee/kit/selector" "git.diulo.com/mogfee/kit/selector"
"git.diulo.com/mogfee/kit/selector/node" "math/rand"
"sync/atomic"
) )
var _ selector.Balancer = (*Balancer)(nil) type Option func(o *options)
func New() selector.Selector { type options struct {
return NewBuilder().Build()
} }
type Balancer struct { type Balancer struct {
index atomic.Int64
} }
func (r *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) { func New(opts ...Option) selector.Selector {
return NewBuilder(opts...).Build()
}
func (b *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
if len(nodes) == 0 { if len(nodes) == 0 {
return nil, nil, selector.ErrNoAvailable return nil, nil, selector.ErrNoAvailable
} }
//index := rand.Intn(len(nodes)) cut := rand.Intn(len(nodes))
index := r.index.Load() selected := nodes[cut]
if int(index) > len(nodes) {
index = 0
}
r.index.Add(1)
selected := nodes[index]
d := selected.Pick() d := selected.Pick()
return selected, d, nil return selected, d, nil
} }
func NewBuilder() selector.Builder { func NewBuilder(opts ...Option) selector.Builder {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.DefaultBuilder{ return &selector.DefaultBuilder{
Node: &node.Builder{},
Balancer: &Builder{}, Balancer: &Builder{},
Node: &direct.Builder{},
} }
} }
type Builder struct { // Builder is random builder
} type Builder struct{}
func (b *Builder) Build() selector.Balancer { func (b *Builder) Builder() selector.Balancer {
return &Balancer{} return &Balancer{}
} }

@ -3,14 +3,13 @@ package selector
import ( import (
"context" "context"
"git.diulo.com/mogfee/kit/errors" "git.diulo.com/mogfee/kit/errors"
"google.golang.org/grpc/balancer"
) )
var ErrNoAvailable = errors.ServiceUnavailable("no_available_node", "") var ErrNoAvailable = errors.ServiceUnavailable("no_available_node", "")
type Selector interface { type Selector interface {
Rebalancer Rebalancer
Select(ctx context.Context, opts ...SelectOption) (selected WeightedNode, di DoneFunc, err error) Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error)
} }
type Rebalancer interface { type Rebalancer interface {
Apply(nodes []Node) Apply(nodes []Node)
@ -21,7 +20,7 @@ type Builder interface {
type Node interface { type Node interface {
Scheme() string Scheme() string
Address() string Address() string
ServerName() string ServiceName() string
InitialWeight() *int64 InitialWeight() *int64
Version() string Version() string
Metadata() map[string]string Metadata() map[string]string
@ -29,11 +28,11 @@ type Node interface {
type DoneInfo struct { type DoneInfo struct {
Err error Err error
ReplyMD ReplyMD ReplyMD ReplyMD
BytesSent bool BytesSent bool
BytesReceived bool BytesReceived bool
} }
type ReplyMD interface { type ReplyMD interface {
Get(key string) string Get(key string) string
} }
type DoneFunc func(ctx context.Context, di DoneInfo)
type DoneFunc func(ctx context.Context, di balancer.DoneInfo)

@ -1,14 +1,18 @@
package http package http
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"fmt"
"git.diulo.com/mogfee/kit/encoding" "git.diulo.com/mogfee/kit/encoding"
"git.diulo.com/mogfee/kit/errors" "git.diulo.com/mogfee/kit/errors"
"git.diulo.com/mogfee/kit/internal/host"
"git.diulo.com/mogfee/kit/internal/httputil" "git.diulo.com/mogfee/kit/internal/httputil"
"git.diulo.com/mogfee/kit/middleware" "git.diulo.com/mogfee/kit/middleware"
"git.diulo.com/mogfee/kit/registry" "git.diulo.com/mogfee/kit/registry"
"git.diulo.com/mogfee/kit/selector" "git.diulo.com/mogfee/kit/selector"
"git.diulo.com/mogfee/kit/transport"
"io" "io"
"net/http" "net/http"
"time" "time"
@ -145,17 +149,144 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
tr.TLSClientConfig = options.tlsConf tr.TLSClientConfig = options.tlsConf
} }
} }
//insecure := options.tlsConf insecure := options.tlsConf == nil
//target, err := parseTarget(options.encoder, insecure) target, err := parseTarget(options.endpoint, insecure)
//if err != nil { if err != nil {
// return nil, err return nil, err
//} }
//selector = selector.GlobalSelector().Build() selector := selector.GlobalSelector().Build()
//var r *resolver var r *resolver
//if options.discovery != nil { if options.discovery != nil {
//} if target.Scheme == "discovery" {
if r, err = newResolver(ctx, options.discovery, target, selector, options.block, insecure); err != nil {
return nil, fmt.Errorf("[http client] new resolver failed!err: %v", options.endpoint)
}
} else if _, _, err = host.ExtractHostPort(options.endpoint); err != nil {
return nil, fmt.Errorf("[http client] invalid endpoint format: %v", options.endpoint)
}
}
return nil, nil return &Client{
opts: options,
targe: target,
r: r,
cc: &http.Client{
Timeout: options.timeout,
Transport: options.transport,
},
insecure: insecure,
selector: selector,
}, nil
}
func (client *Client) Invoke(ctx context.Context, method, path string, args any, reply any, opts ...CallOption) error {
var (
contentType string
body io.Reader
)
c := defaultCallInfo(path)
for _, o := range opts {
if err := o.before(&c); err != nil {
return err
}
}
if args != nil {
data, err := client.opts.encoder(ctx, c.contentType, args)
if err != nil {
return err
}
contentType = c.contentType
body = bytes.NewReader(data)
}
url := fmt.Sprintf("%s://%s%s", client.targe.Scheme, client.targe.Authority, path)
req, err := http.NewRequest(method, url, body)
if err != nil {
return err
}
if contentType != "" {
req.Header.Set("Content-Type", c.contentType)
}
if client.opts.userAgent != "" {
req.Header.Set("User-Agent", client.opts.userAgent)
}
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: client.opts.endpoint,
operation: c.operation,
reqHeader: headerCarrier(req.Header),
request: req,
pathTemplate: c.pathTemplate,
})
return client.invoke(ctx, req, args, reply, c, opts...)
}
func (client *Client) invoke(ctx context.Context, req *http.Request, args any, reply any, c callInfo, opts ...CallOption) error {
h := func(ctx context.Context, in any) (any, error) {
res, err := client.do(req.WithContext(ctx))
if res != nil {
cs := csAttempt{res: res}
for _, o := range opts {
o.after(&c, &cs)
}
}
if err != nil {
return nil, err
}
defer res.Body.Close()
if err = client.opts.decoder(ctx, res, reply); err != nil {
return nil, err
}
return reply, nil
}
var p selector.Peer
ctx = selector.NewPeerContext(ctx, &p)
if len(client.opts.middleware) > 0 {
h = middleware.Chain(client.opts.middleware...)(h)
}
_, err := h(ctx, args)
return err
}
func (client *Client) Do(req *http.Request, opts ...CallOption) (*http.Response, error) {
c := defaultCallInfo(req.URL.Path)
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, err
}
}
return client.do(req)
}
func (client *Client) do(req *http.Request) (*http.Response, error) {
var done func(context.Context, selector.DoneInfo)
if client.r != nil {
var (
err error
node selector.Node
)
if node, done, err = client.selector.Select(req.Context(), selector.WithNodeFilter(client.opts.nodeFilters...)); err != nil {
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
}
if client.insecure {
req.URL.Scheme = "http"
} else {
req.URL.Scheme = "https"
}
req.URL.Host = node.Address()
req.Host = node.Address()
}
resp, err := client.cc.Do(req)
if err == nil {
err = client.opts.errorDecoder(req.Context(), resp)
}
if done != nil {
done(req.Context(), selector.DoneInfo{Err: err})
}
if err != nil {
return nil, err
}
return resp, nil
}
func (client *Client) Close() error {
if client.r != nil {
return client.r.Close()
}
return nil
} }
func DefaultrequestEncoder(ctx context.Context, contentType string, v any) ([]byte, error) { func DefaultrequestEncoder(ctx context.Context, contentType string, v any) ([]byte, error) {
name := httputil.ContentSubtype(contentType) name := httputil.ContentSubtype(contentType)

Loading…
Cancel
Save