package xserver import ( "context" "github.com/nsqio/go-nsq" "sync" ) func NewConsumer(nsqHost string) *Consumer { return &Consumer{ consumers: make([]*nsq.Consumer, 0), host: nsqHost, } } type Consumer struct { consumers []*nsq.Consumer host string wg sync.WaitGroup } func (c *Consumer) Name() string { return "consumer" } func (s *Consumer) Start() error { for _, v := range s.consumers { if err := v.ConnectToNSQD(s.host); err != nil { return err } } return nil } func (s *Consumer) Shutdown(ctx context.Context) error { for _, v := range s.consumers { v.Stop() } s.wg.Wait() return nil } func (s *Consumer) Register(topic, channel string, fun func(msg *nsq.Message) error) error { consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig()) if err != nil { return err } consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { s.wg.Add(1) defer s.wg.Done() return fun(msg) })) s.consumers = append(s.consumers, consumer) return nil }