You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
72 lines
1.4 KiB
72 lines
1.4 KiB
package consumer |
|
|
|
import ( |
|
"github.com/nsqio/go-nsq" |
|
"sync" |
|
) |
|
|
|
type baseConsumerService struct { |
|
consumers []*BaseConsumer |
|
} |
|
|
|
func New() *baseConsumerService { |
|
return &baseConsumerService{ |
|
consumers: []*BaseConsumer{}, |
|
} |
|
} |
|
func (s *baseConsumerService) Register(b *BaseConsumer) { |
|
s.consumers = append(s.consumers, b) |
|
} |
|
func (s *baseConsumerService) Run() error { |
|
for _, v := range s.consumers { |
|
if err := v.Run(); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *baseConsumerService) Stop() error { |
|
for _, v := range s.consumers { |
|
if err := v.Stop(); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
type BaseConsumer struct { |
|
host, topic, consumerChan string |
|
wg *sync.WaitGroup |
|
consumer *nsq.Consumer |
|
} |
|
|
|
func (s *BaseConsumer) Stop() error { |
|
s.consumer.Stop() |
|
s.wg.Wait() |
|
return nil |
|
} |
|
func (s *BaseConsumer) Run() error { |
|
return s.consumer.ConnectToNSQD(s.host) |
|
} |
|
|
|
func NewBaseConsumer(host, topic, consumerChan string, fun func(msg *nsq.Message) error) *BaseConsumer { |
|
resp := &BaseConsumer{ |
|
wg: &sync.WaitGroup{}, |
|
consumer: nil, |
|
host: host, |
|
topic: topic, |
|
consumerChan: consumerChan, |
|
} |
|
consumer, err := nsq.NewConsumer(topic, consumerChan, nsq.NewConfig()) |
|
if err != nil { |
|
panic(err) |
|
} |
|
consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
|
resp.wg.Add(1) |
|
defer resp.wg.Done() |
|
return fun(msg) |
|
})) |
|
resp.consumer = consumer |
|
return resp |
|
}
|
|
|