You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
4.0 KiB

2 years ago
package etcd
import (
"context"
"fmt"
"git.diulo.com/mogfee/kit/registry"
clientv3 "go.etcd.io/etcd/client/v3"
"math/rand"
"time"
)
type Option func(o *options)
type options struct {
ctx context.Context
namespace string
ttl time.Duration
maxRetry int
}
func Context(ctx context.Context) Option {
return func(o *options) {
o.ctx = ctx
}
}
func Namespace(ns string) Option {
return func(o *options) {
o.namespace = ns
}
}
func RegisterTTL(ttl time.Duration) Option {
return func(o *options) {
o.ttl = ttl
}
}
func MaxRetry(num int) Option {
return func(o *options) {
o.maxRetry = num
}
}
type Registry struct {
opts *options
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
}
func New(client *clientv3.Client, opts ...Option) *Registry {
op := &options{
ctx: context.Background(),
namespace: "/microservices",
ttl: time.Second * 5,
maxRetry: 5,
}
for _, o := range opts {
o(op)
}
return &Registry{
opts: op,
client: client,
kv: clientv3.NewKV(client),
}
}
func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
value, err := marshal(service)
if err != nil {
return err
}
if r.lease != nil {
r.lease.Close()
}
r.lease = clientv3.NewLease(r.client)
leaseId, err := r.registerWithKV(ctx, key, value)
if err != nil {
return err
}
go r.heartBeat(r.opts.ctx, leaseId, key, value)
return nil
}
func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
_, err := r.client.Delete(ctx, key)
return err
}
func (r *Registry) GetService(ctx context.Context, serverName string) ([]*registry.ServiceInstance, error) {
key := fmt.Sprintf("%s/%s", r.opts.namespace, serverName)
resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0)
for _, kv := range resp.Kvs {
si, err := unmarshal(kv.Value)
if err != nil {
return nil, err
}
if si.Name != serverName {
continue
}
items = append(items, si)
}
return items, nil
}
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
key := fmt.Sprintf("%s/%s", r.opts.namespace, serviceName)
return newWatcher(ctx, key, serviceName, r.client)
}
func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {
grant, err := r.client.Grant(ctx, int64(r.opts.ttl.Seconds()))
if err != nil {
return 0, err
}
_, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))
if err != nil {
return 0, err
}
return grant.ID, nil
}
func (r *Registry) heartBeat(ctx context.Context, leaseId clientv3.LeaseID, key string, value string) {
curLeaseId := leaseId
kac, err := r.client.KeepAlive(ctx, leaseId)
if err != nil {
curLeaseId = 0
}
rand.Seed(time.Now().Unix())
for {
if curLeaseId == 0 {
var retreat []int
for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {
if ctx.Err() != nil {
return
}
idChan := make(chan clientv3.LeaseID, 1)
errChan := make(chan error, 1)
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
id, registerErr := r.registerWithKV(cancelCtx, key, value)
if registerErr != nil {
errChan <- registerErr
} else {
idChan <- id
}
}()
select {
case <-time.After(3 * time.Second):
cancel()
continue
case <-errChan:
continue
case curLeaseId = <-idChan:
}
kac, err = r.client.KeepAlive(ctx, curLeaseId)
if err != nil {
break
}
retreat := append(retreat, 1<<retryCnt)
time.Sleep(time.Duration(retreat[rand.Intn(len(retreat))]) * time.Second)
}
if _, ok := <-kac; !ok {
return
}
}
select {
case _, ok := <-kac:
if !ok {
if ctx.Err() != nil {
return
}
curLeaseId = 0
continue
}
case <-r.opts.ctx.Done():
return
}
}
}