313 lines
7.2 KiB
Go
313 lines
7.2 KiB
Go
// modified version, see original:
|
|
// https://github.com/dtomasi/go-event-bus/blob/main/event_bus.go
|
|
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type Rec map[string]interface{}
|
|
|
|
|
|
// Event holds topic name and data.
|
|
type Event struct {
|
|
Data Rec
|
|
Topic string
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
// Done calls Done on sync.WaitGroup if set.
|
|
func (e *Event) Done() {
|
|
if e.wg != nil {
|
|
e.wg.Done()
|
|
}
|
|
}
|
|
|
|
// CallbackFunc Defines a CallbackFunc.
|
|
type CallbackFunc func(topic string, data Rec)
|
|
|
|
// EventChannel is a channel which can accept an Event.
|
|
type EventChannel chan Event
|
|
|
|
// NewEventChannel Creates a new EventChannel.
|
|
func NewEventChannel() EventChannel {
|
|
return make(EventChannel)
|
|
}
|
|
|
|
// dataChannelSlice is a slice of DataChannels.
|
|
type eventChannelSlice []EventChannel
|
|
|
|
// EventBus stores the information about subscribers interested for a particular topic.
|
|
type EventBus struct {
|
|
mu sync.RWMutex
|
|
subscribers map[string]eventChannelSlice
|
|
stats *Stats
|
|
}
|
|
|
|
// NewEventBus returns a new EventBus instance.
|
|
func NewEventBus() *EventBus {
|
|
return &EventBus{ //nolint:exhaustivestruct
|
|
subscribers: map[string]eventChannelSlice{},
|
|
stats: newStats(),
|
|
}
|
|
}
|
|
|
|
// getSubscribingChannels returns all subscribing channels including wildcard matches.
|
|
func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice {
|
|
subChannels := eventChannelSlice{}
|
|
|
|
for topicName := range eb.subscribers {
|
|
if topicName == topic || matchWildcard(topicName, topic) {
|
|
subChannels = append(subChannels, eb.subscribers[topicName]...)
|
|
}
|
|
}
|
|
|
|
return subChannels
|
|
}
|
|
|
|
// doPublish is publishing events to channels internally.
|
|
func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) {
|
|
eb.mu.RLock()
|
|
defer eb.mu.RUnlock()
|
|
|
|
go func(channels eventChannelSlice, evt Event) {
|
|
for _, ch := range channels {
|
|
ch <- evt
|
|
}
|
|
}(channels, evt)
|
|
}
|
|
|
|
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
|
|
func matchWildcard(pattern, name string) bool {
|
|
if pattern == "" {
|
|
return name == pattern
|
|
}
|
|
|
|
if pattern == "*" {
|
|
return true
|
|
}
|
|
// Does only wildcard '*' match.
|
|
return deepMatchRune([]rune(name), []rune(pattern), true)
|
|
}
|
|
|
|
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
|
|
func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam
|
|
for len(pattern) > 0 {
|
|
switch pattern[0] {
|
|
default:
|
|
if len(str) == 0 || str[0] != pattern[0] {
|
|
return false
|
|
}
|
|
case '*':
|
|
return deepMatchRune(str, pattern[1:], simple) ||
|
|
(len(str) > 0 && deepMatchRune(str[1:], pattern, simple))
|
|
}
|
|
|
|
str = str[1:]
|
|
|
|
pattern = pattern[1:]
|
|
}
|
|
|
|
return len(str) == 0 && len(pattern) == 0
|
|
}
|
|
|
|
// PublishAsync data to a topic asynchronously
|
|
// This function returns a bool channel which indicates that all subscribers where called.
|
|
func (eb *EventBus) PublishAsync(topic string, data Rec) {
|
|
eb.doPublish(
|
|
eb.getSubscribingChannels(topic),
|
|
Event{
|
|
Data: data,
|
|
Topic: topic,
|
|
wg: nil,
|
|
})
|
|
|
|
eb.stats.incPublishedCountByTopic(topic)
|
|
}
|
|
|
|
// PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once.
|
|
func (eb *EventBus) PublishAsyncOnce(topic string, data Rec) {
|
|
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
|
|
return
|
|
}
|
|
|
|
eb.PublishAsync(topic, data)
|
|
}
|
|
|
|
// Publish data to a topic and wait for all subscribers to finish
|
|
// This function creates a waitGroup internally. All subscribers must call Done() function on Event.
|
|
func (eb *EventBus) Publish(topic string, data Rec) Rec {
|
|
wg := sync.WaitGroup{}
|
|
channels := eb.getSubscribingChannels(topic)
|
|
wg.Add(len(channels))
|
|
eb.doPublish(
|
|
channels,
|
|
Event{
|
|
Data: data,
|
|
Topic: topic,
|
|
wg: &wg,
|
|
})
|
|
wg.Wait()
|
|
|
|
eb.stats.incPublishedCountByTopic(topic)
|
|
|
|
return data
|
|
}
|
|
|
|
// PublishOnce same as Publish but makes sure only published once on topic.
|
|
func (eb *EventBus) PublishOnce(topic string, data Rec) Rec {
|
|
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
|
|
return nil
|
|
}
|
|
|
|
return eb.Publish(topic, data)
|
|
}
|
|
|
|
// Subscribe to a topic passing a EventChannel.
|
|
func (eb *EventBus) Subscribe(topic string) EventChannel {
|
|
ch := make(EventChannel)
|
|
eb.SubscribeChannel(topic, ch)
|
|
|
|
eb.stats.incSubscriberCountByTopic(topic)
|
|
|
|
return ch
|
|
}
|
|
|
|
// SubscribeChannel subscribes to a given Channel.
|
|
func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
if prev, found := eb.subscribers[topic]; found {
|
|
eb.subscribers[topic] = append(prev, ch)
|
|
} else {
|
|
eb.subscribers[topic] = append([]EventChannel{}, ch)
|
|
}
|
|
|
|
eb.stats.incSubscriberCountByTopic(topic)
|
|
}
|
|
|
|
// SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels.
|
|
func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) {
|
|
ch := NewEventChannel()
|
|
eb.SubscribeChannel(topic, ch)
|
|
|
|
go func(callable CallbackFunc) {
|
|
evt := <-ch
|
|
callable(evt.Topic, evt.Data)
|
|
evt.Done()
|
|
}(callable)
|
|
|
|
eb.stats.incSubscriberCountByTopic(topic)
|
|
}
|
|
|
|
// HasSubscribers Check if a topic has subscribers.
|
|
func (eb *EventBus) HasSubscribers(topic string) bool {
|
|
return len(eb.getSubscribingChannels(topic)) > 0
|
|
}
|
|
|
|
// Stats returns the stats map.
|
|
func (eb *EventBus) Stats() *Stats {
|
|
return eb.stats
|
|
}
|
|
|
|
type TopicStats struct {
|
|
Name string
|
|
PublishedCount *SafeCounter
|
|
SubscriberCount *SafeCounter
|
|
}
|
|
|
|
type topicStatsMap map[string]*TopicStats
|
|
|
|
type Stats struct {
|
|
data topicStatsMap
|
|
}
|
|
|
|
func newStats() *Stats {
|
|
return &Stats{
|
|
data: map[string]*TopicStats{},
|
|
}
|
|
}
|
|
|
|
func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats {
|
|
_, ok := s.data[topicName]
|
|
if !ok {
|
|
s.data[topicName] = &TopicStats{
|
|
Name: topicName,
|
|
PublishedCount: NewSafeCounter(),
|
|
SubscriberCount: NewSafeCounter(),
|
|
}
|
|
}
|
|
|
|
return s.data[topicName]
|
|
}
|
|
|
|
func (s *Stats) incSubscriberCountByTopic(topicName string) {
|
|
s.getOrCreateTopicStats(topicName).SubscriberCount.Inc()
|
|
}
|
|
|
|
func (s *Stats) GetSubscriberCountByTopic(topicName string) int {
|
|
return s.getOrCreateTopicStats(topicName).SubscriberCount.Value()
|
|
}
|
|
|
|
func (s *Stats) incPublishedCountByTopic(topicName string) {
|
|
s.getOrCreateTopicStats(topicName).PublishedCount.Inc()
|
|
}
|
|
|
|
func (s *Stats) GetPublishedCountByTopic(topicName string) int {
|
|
return s.getOrCreateTopicStats(topicName).PublishedCount.Value()
|
|
}
|
|
|
|
func (s *Stats) GetTopicStats() []*TopicStats {
|
|
var tStatsSlice []*TopicStats
|
|
for _, tStats := range s.data {
|
|
tStatsSlice = append(tStatsSlice, tStats)
|
|
}
|
|
|
|
return tStatsSlice
|
|
}
|
|
|
|
func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats {
|
|
return s.getOrCreateTopicStats(topicName)
|
|
}
|
|
|
|
// SafeCounter is a concurrency safe counter.
|
|
type SafeCounter struct {
|
|
v *uint64
|
|
}
|
|
|
|
// NewSafeCounter creates a new counter.
|
|
func NewSafeCounter() *SafeCounter {
|
|
return &SafeCounter{
|
|
v: new(uint64),
|
|
}
|
|
}
|
|
|
|
// Value returns the current value.
|
|
func (c *SafeCounter) Value() int {
|
|
return int(atomic.LoadUint64(c.v))
|
|
}
|
|
|
|
// IncBy increments the counter by given delta.
|
|
func (c *SafeCounter) IncBy(add uint) {
|
|
atomic.AddUint64(c.v, uint64(add))
|
|
}
|
|
|
|
// Inc increments the counter by 1.
|
|
func (c *SafeCounter) Inc() {
|
|
c.IncBy(1)
|
|
}
|
|
|
|
// DecBy decrements the counter by given delta.
|
|
func (c *SafeCounter) DecBy(dec uint) {
|
|
atomic.AddUint64(c.v, ^uint64(dec-1))
|
|
}
|
|
|
|
// Dec decrements the counter by 1.
|
|
func (c *SafeCounter) Dec() {
|
|
c.DecBy(1)
|
|
}
|