You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
1.6 KiB

2 years ago
package mq
import (
2 years ago
"context"
2 years ago
"encoding/json"
"fmt"
2 years ago
"git.diulo.com/mogfee/kit/log"
"git.diulo.com/mogfee/kit/xgo"
2 years ago
"github.com/nsqio/go-nsq"
"sync"
"sync/atomic"
)
type nsqLogger struct {
2 years ago
topic string
client *nsq.Producer
list chan []byte
wg sync.WaitGroup
stop atomic.Bool
2 years ago
}
2 years ago
func (s *nsqLogger) Start(ctx context.Context) error {
2 years ago
xgo.Go(func() {
2 years ago
for v := range s.list {
2 years ago
if s.stop.Load() {
break
}
2 years ago
fmt.Println(string(v))
func(v []byte) {
s.wg.Add(1)
defer s.wg.Done()
if err := s.client.Publish(s.topic, v); err != nil {
fmt.Println(err)
}
}(v)
2 years ago
}
2 years ago
})
return nil
2 years ago
}
2 years ago
2 years ago
func (s *nsqLogger) Stop(ctx context.Context) error {
2 years ago
if s.stop.Load() {
return nil
}
s.stop.Store(true)
s.wg.Wait()
return nil
}
2 years ago
func (s *nsqLogger) Name() string {
return "app_log"
}
2 years ago
func NewNsqLogger(host string, topic string) *nsqLogger {
2 years ago
client, err := nsq.NewProducer(host, nsq.NewConfig())
if err != nil {
panic(err)
}
std := &nsqLogger{
2 years ago
topic: topic,
client: client,
list: make(chan []byte, 1000),
wg: sync.WaitGroup{},
2 years ago
}
return std
}
func (l *nsqLogger) Log(level log.Level, keyvals ...any) error {
if l.stop.Load() {
fmt.Println("log is stop")
return nil
}
if len(keyvals) == 0 {
return nil
}
if len(keyvals)&1 == 1 {
keyvals = append(keyvals, "KEYVALS UNPAIRED")
}
2 years ago
buf := make(map[string]any)
2 years ago
buf["status"] = level.String()
for i := 0; i < len(keyvals); i += 2 {
2 years ago
buf[fmt.Sprintf("%v", keyvals[i])] = keyvals[i+1]
}
body, err := json.Marshal(buf)
if err != nil {
return err
2 years ago
}
l.list <- body
return nil
}