master
parent
c67b57a895
commit
c8d4d3b0fe
2 changed files with 80 additions and 0 deletions
@ -0,0 +1,79 @@ |
||||
package mq |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"git.diulo.com/mogfee/protoc-gen-kit/log" |
||||
"github.com/nsqio/go-nsq" |
||||
"sync" |
||||
"sync/atomic" |
||||
) |
||||
|
||||
type nsqLogger struct { |
||||
topic string |
||||
host string |
||||
list chan []byte |
||||
wg sync.WaitGroup |
||||
stop atomic.Bool |
||||
} |
||||
|
||||
func (s *nsqLogger) Run(client *nsq.Producer) { |
||||
s.wg.Add(1) |
||||
defer s.wg.Done() |
||||
for { |
||||
if s.stop.Load() { |
||||
break |
||||
} |
||||
for v := range s.list { |
||||
if err := client.Publish(s.topic, v); err != nil { |
||||
fmt.Println(err) |
||||
continue |
||||
} |
||||
} |
||||
} |
||||
} |
||||
func (s *nsqLogger) Shutdown() error { |
||||
if s.stop.Load() { |
||||
return nil |
||||
} |
||||
s.stop.Store(true) |
||||
s.wg.Wait() |
||||
return nil |
||||
} |
||||
func NewNsqLogger(host string, topic string) log.Logger { |
||||
client, err := nsq.NewProducer(host, nsq.NewConfig()) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
std := &nsqLogger{ |
||||
topic: topic, |
||||
host: host, |
||||
list: make(chan []byte, 1000), |
||||
wg: sync.WaitGroup{}, |
||||
} |
||||
go std.Run(client) |
||||
return std |
||||
|
||||
} |
||||
|
||||
func (l *nsqLogger) Log(level log.Level, keyvals ...any) error { |
||||
if l.stop.Load() { |
||||
fmt.Println("log is stop") |
||||
return nil |
||||
} |
||||
if len(keyvals) == 0 { |
||||
return nil |
||||
} |
||||
if len(keyvals)&1 == 1 { |
||||
keyvals = append(keyvals, "KEYVALS UNPAIRED") |
||||
} |
||||
buf := make(map[any]any) |
||||
buf["status"] = level.String() |
||||
for i := 0; i < len(keyvals); i += 2 { |
||||
buf[keyvals[i]] = keyvals[i+1] |
||||
} |
||||
body, _ := json.Marshal(buf) |
||||
l.list <- body |
||||
return nil |
||||
} |
Loading…
Reference in new issue