You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
1.4 KiB

2 years ago
package consumer
import (
"github.com/nsqio/go-nsq"
"sync"
)
type baseConsumerService struct {
consumers []*BaseConsumer
}
func New() *baseConsumerService {
return &baseConsumerService{
consumers: []*BaseConsumer{},
}
}
func (s *baseConsumerService) Register(b *BaseConsumer) {
s.consumers = append(s.consumers, b)
}
func (s *baseConsumerService) Run() error {
for _, v := range s.consumers {
if err := v.Run(); err != nil {
return err
}
}
return nil
}
func (s *baseConsumerService) Stop() error {
for _, v := range s.consumers {
if err := v.Stop(); err != nil {
return err
}
}
return nil
}
type BaseConsumer struct {
host, topic, consumerChan string
wg *sync.WaitGroup
consumer *nsq.Consumer
}
func (s *BaseConsumer) Stop() error {
s.consumer.Stop()
s.wg.Wait()
return nil
}
func (s *BaseConsumer) Run() error {
return s.consumer.ConnectToNSQD(s.host)
}
func NewBaseConsumer(host, topic, consumerChan string, fun func(msg *nsq.Message) error) *BaseConsumer {
resp := &BaseConsumer{
wg: &sync.WaitGroup{},
consumer: nil,
host: host,
topic: topic,
consumerChan: consumerChan,
}
consumer, err := nsq.NewConsumer(topic, consumerChan, nsq.NewConfig())
if err != nil {
panic(err)
}
consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
resp.wg.Add(1)
defer resp.wg.Done()
return fun(msg)
}))
resp.consumer = consumer
return resp
}