package kit import ( "context" "errors" "git.diulo.com/mogfee/kit/log" "git.diulo.com/mogfee/kit/registry" "git.diulo.com/mogfee/kit/transport" "github.com/google/uuid" "golang.org/x/sync/errgroup" _ "git.diulo.com/mogfee/kit/encoding/form" _ "git.diulo.com/mogfee/kit/encoding/json" "os" "os/signal" "sync" "syscall" "time" ) type AppInfo interface { ID() string Name() string Version() string Metadata() map[string]string Endpoint() []string } type App struct { opts options ctx context.Context cancel func() mu sync.Mutex instance *registry.ServiceInstance } func New(opts ...Option) *App { o := options{ ctx: context.Background(), sigs: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT}, registrarTimeout: 10 * time.Second, stopTimeout: 10 * time.Second, } if id, err := uuid.NewUUID(); err == nil { o.id = id.String() } for _, opt := range opts { opt(&o) } if o.logger != nil { log.SetLogger(o.logger) } ctx, cancel := context.WithCancel(o.ctx) return &App{ opts: o, ctx: ctx, cancel: cancel, } } // ID returns app instance id. func (a *App) ID() string { return a.opts.id } // Name returns service name. func (a *App) Name() string { return a.opts.name } // Version returns app version. func (a *App) Version() string { return a.opts.version } // Metadata returns service metadata. func (a *App) Metadata() map[string]string { return a.opts.metadata } // Endpoint returns endpoints. func (a *App) Endpoint() []string { if a.instance != nil { return a.instance.Endpoints } return nil } func (a *App) Run() error { instance, err := a.buildInstance() if err != nil { return err } a.mu.Lock() a.instance = instance a.mu.Unlock() sctx := NewContext(a.ctx, a) eg, ctx := errgroup.WithContext(sctx) wg := sync.WaitGroup{} for _, fn := range a.opts.beforeStart { if err = fn(sctx); err != nil { return err } } for _, srv := range a.opts.servers { srv := srv eg.Go(func() error { <-ctx.Done() stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout) defer cancel() return srv.Stop(stopCtx) }) wg.Add(1) eg.Go(func() error { wg.Done() return srv.Start(sctx) }) } wg.Wait() if a.opts.registrar != nil { rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout) defer rcancel() if err = a.opts.registrar.Register(rctx, instance); err != nil { return err } } for _, fn := range a.opts.afterStart { if err = fn(sctx); err != nil { return err } } c := make(chan os.Signal, 1) signal.Notify(c, a.opts.sigs...) eg.Go(func() error { select { case <-ctx.Done(): return nil case <-c: return a.Stop() } }) if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { for _, fn := range a.opts.afterStop { err = fn(sctx) } } return err } func (a *App) Stop() (err error) { sctx := NewContext(a.ctx, a) for _, fn := range a.opts.beforeStop { err = fn(sctx) } a.mu.Lock() instance := a.instance a.mu.Unlock() if a.opts.registrar != nil && instance != nil { ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.stopTimeout) defer cancel() if err = a.opts.registrar.Deregister(ctx, instance); err != nil { return err } } if a.cancel != nil { a.cancel() } return err } func (a *App) buildInstance() (*registry.ServiceInstance, error) { endpoints := make([]string, 0, len(a.opts.endpoints)) for _, e := range a.opts.endpoints { endpoints = append(endpoints, e.String()) } if len(endpoints) == 0 { for _, srv := range a.opts.servers { if r, ok := srv.(transport.Endpointer); ok { e, err := r.Endpoint() if err != nil { return nil, err } endpoints = append(endpoints, e.String()) } } } return ®istry.ServiceInstance{ ID: a.opts.id, Name: a.opts.name, Version: a.opts.version, Metadata: a.opts.metadata, Endpoints: endpoints, }, nil } type appKey struct { } func NewContext(ctx context.Context, s AppInfo) context.Context { return context.WithValue(ctx, appKey{}, s) } func FromContext(ctx context.Context) (s AppInfo, ok bool) { s, ok = ctx.Value(appKey{}).(AppInfo) return }