You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
73 lines
1.4 KiB
73 lines
1.4 KiB
2 years ago
|
package consumer
|
||
|
|
||
|
import (
|
||
|
"github.com/nsqio/go-nsq"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type baseConsumerService struct {
|
||
|
consumers []*BaseConsumer
|
||
|
}
|
||
|
|
||
|
func New() *baseConsumerService {
|
||
|
return &baseConsumerService{
|
||
|
consumers: []*BaseConsumer{},
|
||
|
}
|
||
|
}
|
||
|
func (s *baseConsumerService) Register(b *BaseConsumer) {
|
||
|
s.consumers = append(s.consumers, b)
|
||
|
}
|
||
|
func (s *baseConsumerService) Run() error {
|
||
|
for _, v := range s.consumers {
|
||
|
if err := v.Run(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *baseConsumerService) Stop() error {
|
||
|
for _, v := range s.consumers {
|
||
|
if err := v.Stop(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type BaseConsumer struct {
|
||
|
host, topic, consumerChan string
|
||
|
wg *sync.WaitGroup
|
||
|
consumer *nsq.Consumer
|
||
|
}
|
||
|
|
||
|
func (s *BaseConsumer) Stop() error {
|
||
|
s.consumer.Stop()
|
||
|
s.wg.Wait()
|
||
|
return nil
|
||
|
}
|
||
|
func (s *BaseConsumer) Run() error {
|
||
|
return s.consumer.ConnectToNSQD(s.host)
|
||
|
}
|
||
|
|
||
|
func NewBaseConsumer(host, topic, consumerChan string, fun func(msg *nsq.Message) error) *BaseConsumer {
|
||
|
resp := &BaseConsumer{
|
||
|
wg: &sync.WaitGroup{},
|
||
|
consumer: nil,
|
||
|
host: host,
|
||
|
topic: topic,
|
||
|
consumerChan: consumerChan,
|
||
|
}
|
||
|
consumer, err := nsq.NewConsumer(topic, consumerChan, nsq.NewConfig())
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
|
||
|
resp.wg.Add(1)
|
||
|
defer resp.wg.Done()
|
||
|
return fun(msg)
|
||
|
}))
|
||
|
resp.consumer = consumer
|
||
|
return resp
|
||
|
}
|