From 9291e6faf9dbd3956f78dfd179a83441a1dca0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BC=9F=E4=B9=90?= Date: Fri, 3 Mar 2023 15:08:03 +0800 Subject: [PATCH] x --- log/mq/nsq.go | 59 ++++++++++++++++++++++--------------- pkg/captcha/captcha_test.go | 4 +-- pkg/xgo/xgo.go | 12 ++++++++ 3 files changed, 49 insertions(+), 26 deletions(-) create mode 100644 pkg/xgo/xgo.go diff --git a/log/mq/nsq.go b/log/mq/nsq.go index a80ad07..6acd2a6 100644 --- a/log/mq/nsq.go +++ b/log/mq/nsq.go @@ -1,38 +1,46 @@ package mq import ( + "context" "encoding/json" "fmt" "git.diulo.com/mogfee/protoc-gen-kit/log" + "git.diulo.com/mogfee/protoc-gen-kit/pkg/xgo" "github.com/nsqio/go-nsq" "sync" "sync/atomic" ) type nsqLogger struct { - topic string - host string - list chan []byte - wg sync.WaitGroup - stop atomic.Bool + topic string + client *nsq.Producer + list chan []byte + wg sync.WaitGroup + stop atomic.Bool } -func (s *nsqLogger) Run(client *nsq.Producer) { - s.wg.Add(1) - defer s.wg.Done() - for { - if s.stop.Load() { - break - } - for v := range s.list { - if err := client.Publish(s.topic, v); err != nil { - fmt.Println(err) - continue +func (s *nsqLogger) Start() error { + xgo.Go(func() { + for { + if s.stop.Load() { + break + } + for v := range s.list { + fmt.Println(string(v)) + func(v []byte) { + s.wg.Add(1) + defer s.wg.Done() + if err := s.client.Publish(s.topic, v); err != nil { + fmt.Println(err) + } + }(v) } } - } + }) + return nil } -func (s *nsqLogger) Shutdown() error { + +func (s *nsqLogger) Shutdown(ctx context.Context) error { if s.stop.Load() { return nil } @@ -40,6 +48,11 @@ func (s *nsqLogger) Shutdown() error { s.wg.Wait() return nil } + +func (s *nsqLogger) Name() string { + return "app_log" +} + func NewNsqLogger(host string, topic string) log.Logger { client, err := nsq.NewProducer(host, nsq.NewConfig()) if err != nil { @@ -47,14 +60,12 @@ func NewNsqLogger(host string, topic string) log.Logger { } std := &nsqLogger{ - topic: topic, - host: host, - list: make(chan []byte, 1000), - wg: sync.WaitGroup{}, + topic: topic, + client: client, + list: make(chan []byte, 1000), + wg: sync.WaitGroup{}, } - go std.Run(client) return std - } func (l *nsqLogger) Log(level log.Level, keyvals ...any) error { diff --git a/pkg/captcha/captcha_test.go b/pkg/captcha/captcha_test.go index e91798e..0ceeae9 100644 --- a/pkg/captcha/captcha_test.go +++ b/pkg/captcha/captcha_test.go @@ -8,8 +8,8 @@ import ( func TestGenerateKey(t *testing.T) { client := redis.NewClient(&redis.Options{}) - store := NewRedisStore("user", client) - id, _, err := Generate(store) + store := NewRedisStore(client) + id, _, err := Generate() fmt.Println(id, err) fmt.Println(store.Verify(id, "aa", false)) } diff --git a/pkg/xgo/xgo.go b/pkg/xgo/xgo.go new file mode 100644 index 0000000..74b5c1a --- /dev/null +++ b/pkg/xgo/xgo.go @@ -0,0 +1,12 @@ +package xgo + +import "fmt" + +func Go(fun func()) { + go func() { + if err := recover(); err != nil { + fmt.Println(err) + } + fun() + }() +}