You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
54 lines
1022 B
54 lines
1022 B
package xserver |
|
|
|
import ( |
|
"context" |
|
"github.com/nsqio/go-nsq" |
|
"sync" |
|
) |
|
|
|
func NewConsumer(nsqHost string) *Consumer { |
|
return &Consumer{ |
|
consumers: make([]*nsq.Consumer, 0), |
|
host: nsqHost, |
|
} |
|
} |
|
|
|
type Consumer struct { |
|
consumers []*nsq.Consumer |
|
host string |
|
wg sync.WaitGroup |
|
} |
|
|
|
func (c *Consumer) Name() string { |
|
return "consumer" |
|
} |
|
|
|
func (s *Consumer) Start() error { |
|
for _, v := range s.consumers { |
|
if err := v.ConnectToNSQD(s.host); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
func (s *Consumer) Shutdown(ctx context.Context) error { |
|
for _, v := range s.consumers { |
|
v.Stop() |
|
} |
|
s.wg.Wait() |
|
return nil |
|
} |
|
|
|
func (s *Consumer) Register(topic, channel string, fun func(msg *nsq.Message) error) error { |
|
consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig()) |
|
if err != nil { |
|
return err |
|
} |
|
consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
|
s.wg.Add(1) |
|
defer s.wg.Done() |
|
return fun(msg) |
|
})) |
|
s.consumers = append(s.consumers, consumer) |
|
return nil |
|
}
|
|
|