package mq import ( "context" "encoding/json" "fmt" "git.diulo.com/mogfee/protoc-gen-kit/log" "git.diulo.com/mogfee/protoc-gen-kit/pkg/xgo" "github.com/nsqio/go-nsq" "sync" "sync/atomic" ) type nsqLogger struct { topic string client *nsq.Producer list chan []byte wg sync.WaitGroup stop atomic.Bool } func (s *nsqLogger) Start() error { xgo.Go(func() { for { if s.stop.Load() { break } for v := range s.list { fmt.Println(string(v)) func(v []byte) { s.wg.Add(1) defer s.wg.Done() if err := s.client.Publish(s.topic, v); err != nil { fmt.Println(err) } }(v) } } }) return nil } func (s *nsqLogger) Shutdown(ctx context.Context) error { if s.stop.Load() { return nil } s.stop.Store(true) s.wg.Wait() return nil } func (s *nsqLogger) Name() string { return "app_log" } func NewNsqLogger(host string, topic string) log.Logger { client, err := nsq.NewProducer(host, nsq.NewConfig()) if err != nil { panic(err) } std := &nsqLogger{ topic: topic, client: client, list: make(chan []byte, 1000), wg: sync.WaitGroup{}, } return std } func (l *nsqLogger) Log(level log.Level, keyvals ...any) error { if l.stop.Load() { fmt.Println("log is stop") return nil } if len(keyvals) == 0 { return nil } if len(keyvals)&1 == 1 { keyvals = append(keyvals, "KEYVALS UNPAIRED") } buf := make(map[any]any) buf["status"] = level.String() for i := 0; i < len(keyvals); i += 2 { buf[keyvals[i]] = keyvals[i+1] } body, _ := json.Marshal(buf) l.list <- body return nil }