package consumer import ( "github.com/nsqio/go-nsq" "sync" ) type baseConsumerService struct { consumers []*BaseConsumer } func New() *baseConsumerService { return &baseConsumerService{ consumers: []*BaseConsumer{}, } } func (s *baseConsumerService) Register(b *BaseConsumer) { s.consumers = append(s.consumers, b) } func (s *baseConsumerService) Run() error { for _, v := range s.consumers { if err := v.Run(); err != nil { return err } } return nil } func (s *baseConsumerService) Stop() error { for _, v := range s.consumers { if err := v.Stop(); err != nil { return err } } return nil } type BaseConsumer struct { host, topic, consumerChan string wg *sync.WaitGroup consumer *nsq.Consumer } func (s *BaseConsumer) Stop() error { s.consumer.Stop() s.wg.Wait() return nil } func (s *BaseConsumer) Run() error { return s.consumer.ConnectToNSQD(s.host) } func NewBaseConsumer(host, topic, consumerChan string, fun func(msg *nsq.Message) error) *BaseConsumer { resp := &BaseConsumer{ wg: &sync.WaitGroup{}, consumer: nil, host: host, topic: topic, consumerChan: consumerChan, } consumer, err := nsq.NewConsumer(topic, consumerChan, nsq.NewConfig()) if err != nil { panic(err) } consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { resp.wg.Add(1) defer resp.wg.Done() return fun(msg) })) resp.consumer = consumer return resp }