From 155c071770bae255a6481117d905d6a799790613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BC=9F=E4=B9=90?= Date: Mon, 26 Dec 2022 15:14:15 +0800 Subject: [PATCH] x --- go.mod | 2 ++ go.sum | 4 ++++ xserver/consumer.go | 54 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) create mode 100644 xserver/consumer.go diff --git a/go.mod b/go.mod index 6019776..885672c 100644 --- a/go.mod +++ b/go.mod @@ -18,11 +18,13 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/validator/v10 v10.10.0 // indirect github.com/goccy/go-json v0.9.7 // indirect + github.com/golang/snappy v0.0.1 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nsqio/go-nsq v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/ugorji/go/codec v1.2.7 // indirect golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect diff --git a/go.sum b/go.sum index 319f0a2..c432da2 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -63,6 +65,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU= github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/xserver/consumer.go b/xserver/consumer.go new file mode 100644 index 0000000..60a6424 --- /dev/null +++ b/xserver/consumer.go @@ -0,0 +1,54 @@ +package xserver + +import ( + "context" + "github.com/nsqio/go-nsq" + "sync" +) + +func NewConsumer(nsqHost string) *consumer { + return &consumer{ + consumers: make([]*nsq.Consumer, 0), + host: nsqHost, + } +} + +type consumer struct { + consumers []*nsq.Consumer + host string + wg sync.WaitGroup +} + +func (c *consumer) Name() string { + return "consumer" +} + +func (s *consumer) Start() error { + for _, v := range s.consumers { + if err := v.ConnectToNSQD(s.host); err != nil { + return err + } + } + return nil +} +func (s *consumer) Shutdown(ctx context.Context) error { + for _, v := range s.consumers { + v.Stop() + } + s.wg.Wait() + return nil +} + +func (s *consumer) Register(topic, channel string, fun func(msg *nsq.Message) error) error { + consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig()) + if err != nil { + return err + } + consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + s.wg.Add(1) + defer s.wg.Done() + return fun(msg) + })) + s.consumers = append(s.consumers, consumer) + return nil +}