李伟乐 2 years ago
parent 155c071770
commit 62b10566cf
  1. 14
      xserver/consumer.go

@ -6,24 +6,24 @@ import (
"sync" "sync"
) )
func NewConsumer(nsqHost string) *consumer { func NewConsumer(nsqHost string) *Consumer {
return &consumer{ return &Consumer{
consumers: make([]*nsq.Consumer, 0), consumers: make([]*nsq.Consumer, 0),
host: nsqHost, host: nsqHost,
} }
} }
type consumer struct { type Consumer struct {
consumers []*nsq.Consumer consumers []*nsq.Consumer
host string host string
wg sync.WaitGroup wg sync.WaitGroup
} }
func (c *consumer) Name() string { func (c *Consumer) Name() string {
return "consumer" return "consumer"
} }
func (s *consumer) Start() error { func (s *Consumer) Start() error {
for _, v := range s.consumers { for _, v := range s.consumers {
if err := v.ConnectToNSQD(s.host); err != nil { if err := v.ConnectToNSQD(s.host); err != nil {
return err return err
@ -31,7 +31,7 @@ func (s *consumer) Start() error {
} }
return nil return nil
} }
func (s *consumer) Shutdown(ctx context.Context) error { func (s *Consumer) Shutdown(ctx context.Context) error {
for _, v := range s.consumers { for _, v := range s.consumers {
v.Stop() v.Stop()
} }
@ -39,7 +39,7 @@ func (s *consumer) Shutdown(ctx context.Context) error {
return nil return nil
} }
func (s *consumer) Register(topic, channel string, fun func(msg *nsq.Message) error) error { func (s *Consumer) Register(topic, channel string, fun func(msg *nsq.Message) error) error {
consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig()) consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
if err != nil { if err != nil {
return err return err

Loading…
Cancel
Save