You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
185 lines
4.0 KiB
185 lines
4.0 KiB
package etcd |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"git.diulo.com/mogfee/kit/registry" |
|
clientv3 "go.etcd.io/etcd/client/v3" |
|
"math/rand" |
|
"time" |
|
) |
|
|
|
type Option func(o *options) |
|
|
|
type options struct { |
|
ctx context.Context |
|
namespace string |
|
ttl time.Duration |
|
maxRetry int |
|
} |
|
|
|
func Context(ctx context.Context) Option { |
|
return func(o *options) { |
|
o.ctx = ctx |
|
} |
|
} |
|
func Namespace(ns string) Option { |
|
return func(o *options) { |
|
o.namespace = ns |
|
} |
|
} |
|
func RegisterTTL(ttl time.Duration) Option { |
|
return func(o *options) { |
|
o.ttl = ttl |
|
} |
|
} |
|
func MaxRetry(num int) Option { |
|
return func(o *options) { |
|
o.maxRetry = num |
|
} |
|
} |
|
|
|
type Registry struct { |
|
opts *options |
|
client *clientv3.Client |
|
kv clientv3.KV |
|
lease clientv3.Lease |
|
} |
|
|
|
func New(client *clientv3.Client, opts ...Option) *Registry { |
|
op := &options{ |
|
ctx: context.Background(), |
|
namespace: "/microservices", |
|
ttl: time.Second * 5, |
|
maxRetry: 5, |
|
} |
|
for _, o := range opts { |
|
o(op) |
|
} |
|
return &Registry{ |
|
opts: op, |
|
client: client, |
|
kv: clientv3.NewKV(client), |
|
} |
|
} |
|
|
|
func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error { |
|
key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) |
|
value, err := marshal(service) |
|
if err != nil { |
|
return err |
|
} |
|
if r.lease != nil { |
|
r.lease.Close() |
|
} |
|
r.lease = clientv3.NewLease(r.client) |
|
leaseId, err := r.registerWithKV(ctx, key, value) |
|
if err != nil { |
|
return err |
|
} |
|
go r.heartBeat(r.opts.ctx, leaseId, key, value) |
|
return nil |
|
} |
|
|
|
func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error { |
|
key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) |
|
_, err := r.client.Delete(ctx, key) |
|
return err |
|
} |
|
func (r *Registry) GetService(ctx context.Context, serverName string) ([]*registry.ServiceInstance, error) { |
|
key := fmt.Sprintf("%s/%s", r.opts.namespace, serverName) |
|
resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix()) |
|
if err != nil { |
|
return nil, err |
|
} |
|
items := make([]*registry.ServiceInstance, 0) |
|
for _, kv := range resp.Kvs { |
|
si, err := unmarshal(kv.Value) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if si.Name != serverName { |
|
continue |
|
} |
|
items = append(items, si) |
|
} |
|
return items, nil |
|
} |
|
|
|
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { |
|
key := fmt.Sprintf("%s/%s", r.opts.namespace, serviceName) |
|
return newWatcher(ctx, key, serviceName, r.client) |
|
} |
|
func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) { |
|
grant, err := r.client.Grant(ctx, int64(r.opts.ttl.Seconds())) |
|
if err != nil { |
|
return 0, err |
|
} |
|
_, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID)) |
|
if err != nil { |
|
return 0, err |
|
} |
|
return grant.ID, nil |
|
} |
|
func (r *Registry) heartBeat(ctx context.Context, leaseId clientv3.LeaseID, key string, value string) { |
|
curLeaseId := leaseId |
|
kac, err := r.client.KeepAlive(ctx, leaseId) |
|
if err != nil { |
|
curLeaseId = 0 |
|
} |
|
rand.Seed(time.Now().Unix()) |
|
for { |
|
if curLeaseId == 0 { |
|
var retreat []int |
|
for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ { |
|
if ctx.Err() != nil { |
|
return |
|
} |
|
|
|
idChan := make(chan clientv3.LeaseID, 1) |
|
errChan := make(chan error, 1) |
|
cancelCtx, cancel := context.WithCancel(ctx) |
|
go func() { |
|
defer cancel() |
|
id, registerErr := r.registerWithKV(cancelCtx, key, value) |
|
if registerErr != nil { |
|
errChan <- registerErr |
|
} else { |
|
idChan <- id |
|
} |
|
}() |
|
|
|
select { |
|
case <-time.After(3 * time.Second): |
|
cancel() |
|
continue |
|
case <-errChan: |
|
continue |
|
case curLeaseId = <-idChan: |
|
} |
|
kac, err = r.client.KeepAlive(ctx, curLeaseId) |
|
if err != nil { |
|
break |
|
} |
|
retreat := append(retreat, 1<<retryCnt) |
|
time.Sleep(time.Duration(retreat[rand.Intn(len(retreat))]) * time.Second) |
|
} |
|
if _, ok := <-kac; !ok { |
|
return |
|
} |
|
} |
|
|
|
select { |
|
case _, ok := <-kac: |
|
if !ok { |
|
if ctx.Err() != nil { |
|
return |
|
} |
|
curLeaseId = 0 |
|
continue |
|
} |
|
case <-r.opts.ctx.Done(): |
|
return |
|
} |
|
} |
|
}
|
|
|