You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
|
package mq
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"git.diulo.com/mogfee/kit/log"
|
|
|
|
"git.diulo.com/mogfee/kit/xgo"
|
|
|
|
"github.com/nsqio/go-nsq"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
)
|
|
|
|
|
|
|
|
type nsqLogger struct {
|
|
|
|
topic string
|
|
|
|
client *nsq.Producer
|
|
|
|
list chan []byte
|
|
|
|
wg sync.WaitGroup
|
|
|
|
stop atomic.Bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *nsqLogger) Start(ctx context.Context) error {
|
|
|
|
xgo.Go(func() {
|
|
|
|
for v := range s.list {
|
|
|
|
if s.stop.Load() {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
fmt.Println(string(v))
|
|
|
|
func(v []byte) {
|
|
|
|
s.wg.Add(1)
|
|
|
|
defer s.wg.Done()
|
|
|
|
if err := s.client.Publish(s.topic, v); err != nil {
|
|
|
|
fmt.Println(err)
|
|
|
|
}
|
|
|
|
}(v)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *nsqLogger) Stop(ctx context.Context) error {
|
|
|
|
if s.stop.Load() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
s.stop.Store(true)
|
|
|
|
s.wg.Wait()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *nsqLogger) Name() string {
|
|
|
|
return "app_log"
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewNsqLogger(host string, topic string) *nsqLogger {
|
|
|
|
client, err := nsq.NewProducer(host, nsq.NewConfig())
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
std := &nsqLogger{
|
|
|
|
topic: topic,
|
|
|
|
client: client,
|
|
|
|
list: make(chan []byte, 1000),
|
|
|
|
wg: sync.WaitGroup{},
|
|
|
|
}
|
|
|
|
return std
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *nsqLogger) Log(level log.Level, keyvals ...any) error {
|
|
|
|
if l.stop.Load() {
|
|
|
|
fmt.Println("log is stop")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if len(keyvals) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if len(keyvals)&1 == 1 {
|
|
|
|
keyvals = append(keyvals, "KEYVALS UNPAIRED")
|
|
|
|
}
|
|
|
|
buf := make(map[string]any)
|
|
|
|
buf["status"] = level.String()
|
|
|
|
for i := 0; i < len(keyvals); i += 2 {
|
|
|
|
buf[fmt.Sprintf("%v", keyvals[i])] = keyvals[i+1]
|
|
|
|
}
|
|
|
|
body, err := json.Marshal(buf)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
l.list <- body
|
|
|
|
return nil
|
|
|
|
}
|