package etcd import ( "context" "fmt" "git.diulo.com/mogfee/kit/registry" clientv3 "go.etcd.io/etcd/client/v3" "math/rand" "time" ) type Option func(o *options) type options struct { ctx context.Context namespace string ttl time.Duration maxRetry int } func Context(ctx context.Context) Option { return func(o *options) { o.ctx = ctx } } func Namespace(ns string) Option { return func(o *options) { o.namespace = ns } } func RegisterTTL(ttl time.Duration) Option { return func(o *options) { o.ttl = ttl } } func MaxRetry(num int) Option { return func(o *options) { o.maxRetry = num } } type Registry struct { opts *options client *clientv3.Client kv clientv3.KV lease clientv3.Lease } func New(client *clientv3.Client, opts ...Option) *Registry { op := &options{ ctx: context.Background(), namespace: "/microservices", ttl: time.Second * 5, maxRetry: 5, } for _, o := range opts { o(op) } return &Registry{ opts: op, client: client, kv: clientv3.NewKV(client), } } func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error { key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) value, err := marshal(service) if err != nil { return err } if r.lease != nil { r.lease.Close() } r.lease = clientv3.NewLease(r.client) leaseId, err := r.registerWithKV(ctx, key, value) if err != nil { return err } go r.heartBeat(r.opts.ctx, leaseId, key, value) return nil } func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error { key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) _, err := r.client.Delete(ctx, key) return err } func (r *Registry) GetService(ctx context.Context, serverName string) ([]*registry.ServiceInstance, error) { key := fmt.Sprintf("%s/%s", r.opts.namespace, serverName) resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix()) if err != nil { return nil, err } items := make([]*registry.ServiceInstance, 0) for _, kv := range resp.Kvs { si, err := unmarshal(kv.Value) if err != nil { return nil, err } if si.Name != serverName { continue } items = append(items, si) } return items, nil } func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { key := fmt.Sprintf("%s/%s", r.opts.namespace, serviceName) return newWatcher(ctx, key, serviceName, r.client) } func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) { grant, err := r.client.Grant(ctx, int64(r.opts.ttl.Seconds())) if err != nil { return 0, err } _, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID)) if err != nil { return 0, err } return grant.ID, nil } func (r *Registry) heartBeat(ctx context.Context, leaseId clientv3.LeaseID, key string, value string) { curLeaseId := leaseId kac, err := r.client.KeepAlive(ctx, leaseId) if err != nil { curLeaseId = 0 } rand.Seed(time.Now().Unix()) for { if curLeaseId == 0 { var retreat []int for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ { if ctx.Err() != nil { return } idChan := make(chan clientv3.LeaseID, 1) errChan := make(chan error, 1) cancelCtx, cancel := context.WithCancel(ctx) go func() { defer cancel() id, registerErr := r.registerWithKV(cancelCtx, key, value) if registerErr != nil { errChan <- registerErr } else { idChan <- id } }() select { case <-time.After(3 * time.Second): cancel() continue case <-errChan: continue case curLeaseId = <-idChan: } kac, err = r.client.KeepAlive(ctx, curLeaseId) if err != nil { break } retreat := append(retreat, 1<