李伟乐 2 years ago
parent 3272f7090d
commit 155c071770
  1. 2
      go.mod
  2. 4
      go.sum
  3. 54
      xserver/consumer.go

@ -18,11 +18,13 @@ require (
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nsqio/go-nsq v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect

@ -38,6 +38,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@ -63,6 +65,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU=
github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=

@ -0,0 +1,54 @@
package xserver
import (
"context"
"github.com/nsqio/go-nsq"
"sync"
)
func NewConsumer(nsqHost string) *consumer {
return &consumer{
consumers: make([]*nsq.Consumer, 0),
host: nsqHost,
}
}
type consumer struct {
consumers []*nsq.Consumer
host string
wg sync.WaitGroup
}
func (c *consumer) Name() string {
return "consumer"
}
func (s *consumer) Start() error {
for _, v := range s.consumers {
if err := v.ConnectToNSQD(s.host); err != nil {
return err
}
}
return nil
}
func (s *consumer) Shutdown(ctx context.Context) error {
for _, v := range s.consumers {
v.Stop()
}
s.wg.Wait()
return nil
}
func (s *consumer) Register(topic, channel string, fun func(msg *nsq.Message) error) error {
consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
if err != nil {
return err
}
consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
s.wg.Add(1)
defer s.wg.Done()
return fun(msg)
}))
s.consumers = append(s.consumers, consumer)
return nil
}
Loading…
Cancel
Save