package main import ( "sync" "sync/atomic" ) // DefaultEventChannelBuffer is the buffer size used for new event channels. // Use a small buffer to avoid accidental blocking in common cases while still // keeping memory usage modest. const DefaultEventChannelBuffer = 8 type Data map[string]interface{} // SafeCounter is a concurrency safe counter. type SafeCounter struct { v *uint64 } // NewSafeCounter creates a new counter. func NewSafeCounter() *SafeCounter { return &SafeCounter{ v: new(uint64), } } // Value returns the current value. func (c *SafeCounter) Value() int { return int(atomic.LoadUint64(c.v)) } // IncBy increments the counter by given delta. func (c *SafeCounter) IncBy(add uint) { atomic.AddUint64(c.v, uint64(add)) } // Inc increments the counter by 1. func (c *SafeCounter) Inc() { c.IncBy(1) } // DecBy decrements the counter by given delta. func (c *SafeCounter) DecBy(dec uint) { atomic.AddUint64(c.v, ^uint64(dec-1)) } // Dec decrements the counter by 1. func (c *SafeCounter) Dec() { c.DecBy(1) } type TopicStats struct { Name string PublishedCount *SafeCounter SubscriberCount *SafeCounter } type topicStatsMap map[string]*TopicStats // Stats is concurrency-safe. type Stats struct { mu sync.RWMutex data topicStatsMap } func newStats() *Stats { return &Stats{ data: map[string]*TopicStats{}, } } func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats { s.mu.Lock() defer s.mu.Unlock() if ts, ok := s.data[topicName]; ok { return ts } ts := &TopicStats{ Name: topicName, PublishedCount: NewSafeCounter(), SubscriberCount: NewSafeCounter(), } s.data[topicName] = ts return ts } func (s *Stats) incSubscriberCountByTopic(topicName string) { s.getOrCreateTopicStats(topicName).SubscriberCount.Inc() } func (s *Stats) decSubscriberCountByTopic(topicName string) { // ensure topic exists, then decrement s.getOrCreateTopicStats(topicName).SubscriberCount.Dec() } func (s *Stats) GetSubscriberCountByTopic(topicName string) int { return s.getOrCreateTopicStats(topicName).SubscriberCount.Value() } func (s *Stats) incPublishedCountByTopic(topicName string) { s.getOrCreateTopicStats(topicName).PublishedCount.Inc() } func (s *Stats) GetPublishedCountByTopic(topicName string) int { return s.getOrCreateTopicStats(topicName).PublishedCount.Value() } func (s *Stats) GetTopicStats() []*TopicStats { s.mu.RLock() defer s.mu.RUnlock() tStatsSlice := make([]*TopicStats, 0, len(s.data)) for _, tStats := range s.data { tStatsSlice = append(tStatsSlice, tStats) } return tStatsSlice } func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats { return s.getOrCreateTopicStats(topicName) } // Event holds topic name and data. type Event struct { Data Data Topic string wg *sync.WaitGroup } // Done calls Done on sync.WaitGroup if set. func (e *Event) Done() { if e.wg != nil { e.wg.Done() } } // CallbackFunc Defines a CallbackFunc. type CallbackFunc func(topic string, data Data) // EventChannel is a channel which can accept an Event. type EventChannel chan Event // NewEventChannel Creates a new EventChannel with default buffer. func NewEventChannel() EventChannel { return make(EventChannel, DefaultEventChannelBuffer) } // NewUnbufferedEventChannel creates an unbuffered channel (exposed if caller wants it). func NewUnbufferedEventChannel() EventChannel { return make(EventChannel) } // eventChannelSlice is a slice of EventChannels. type eventChannelSlice []EventChannel // EventBus stores the information about subscribers interested for a particular topic. type EventBus struct { mu sync.RWMutex subscribers map[string]eventChannelSlice stats *Stats } // NewEventBus returns a new EventBus instance. func NewEventBus() *EventBus { return &EventBus{ //nolint:exhaustivestruct subscribers: map[string]eventChannelSlice{}, stats: newStats(), } } // getSubscribingChannels returns all subscribing channels including wildcard matches. func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice { eb.mu.RLock() defer eb.mu.RUnlock() subChannels := eventChannelSlice{} for topicName, chans := range eb.subscribers { if topicName == topic || matchWildcard(topicName, topic) { // append clone to avoid races on the underlying slice subChannels = append(subChannels, chans...) } } return subChannels } // doPublish sends the event to each channel synchronously (blocking sends). // This is used by Publish (synchronous) so the WaitGroup semantics are preserved. func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) { for _, ch := range channels { ch <- evt // blocking send; Publish relies on this behavior } } // doPublishAsync tries to send to each channel without blocking — dropped sends are ignored. // This is used by PublishAsync to avoid goroutine leaks when subscribers are slow/missing. func (eb *EventBus) doPublishAsync(channels eventChannelSlice, evt Event) { for _, ch := range channels { select { case ch <- evt: // delivered default: // subscriber not ready; drop the event for this subscriber // optionally: count drops or log } } } // Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go func matchWildcard(pattern, name string) bool { if pattern == "" { return name == pattern } if pattern == "*" { return true } // Does only wildcard '*' match. return deepMatchRune([]rune(name), []rune(pattern), true) } // Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam for len(pattern) > 0 { switch pattern[0] { default: if len(str) == 0 || str[0] != pattern[0] { return false } case '*': return deepMatchRune(str, pattern[1:], simple) || (len(str) > 0 && deepMatchRune(str[1:], pattern, simple)) } str = str[1:] pattern = pattern[1:] } return len(str) == 0 && len(pattern) == 0 } // PublishAsync data to a topic asynchronously. // This will try to deliver events without blocking; slow/missing subscribers may miss events. func (eb *EventBus) PublishAsync(topic string, data Data) { channels := eb.getSubscribingChannels(topic) evt := Event{ Data: data, Topic: topic, wg: nil, } // run async non-blocking publisher in a goroutine so caller isn't blocked go eb.doPublishAsync(channels, evt) eb.stats.incPublishedCountByTopic(topic) } // PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once. func (eb *EventBus) PublishAsyncOnce(topic string, data Data) { if eb.stats.GetPublishedCountByTopic(topic) > 0 { return } eb.PublishAsync(topic, data) } // Publish data to a topic and wait for all subscribers to finish // This function creates a waitGroup internally. All subscribers must call Done() function on Event. func (eb *EventBus) Publish(topic string, data Data) interface{} { channels := eb.getSubscribingChannels(topic) wg := sync.WaitGroup{} wg.Add(len(channels)) evt := Event{ Data: data, Topic: topic, wg: &wg, } // synchronous blocking publish: callers wait until subscribers call Done() eb.doPublish(channels, evt) wg.Wait() eb.stats.incPublishedCountByTopic(topic) return data } // PublishOnce same as Publish but makes sure only published once on topic. func (eb *EventBus) PublishOnce(topic string, data Data) interface{} { if eb.stats.GetPublishedCountByTopic(topic) > 0 { return nil } return eb.Publish(topic, data) } // Subscribe to a topic returning a buffered EventChannel (to reduce accidental blocking). func (eb *EventBus) Subscribe(topic string) EventChannel { ch := NewEventChannel() eb.SubscribeChannel(topic, ch) return ch } // SubscribeChannel subscribes to a given Channel. func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) { eb.mu.Lock() defer eb.mu.Unlock() if prev, found := eb.subscribers[topic]; found { eb.subscribers[topic] = append(prev, ch) } else { eb.subscribers[topic] = append([]EventChannel{}, ch) } eb.stats.incSubscriberCountByTopic(topic) } // Unsubscribe removes a previously-subscribed channel for a topic. func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) { eb.UnsubscribeChannel(topic, ch) } // UnsubscribeChannel removes a channel from subscribers and decrements the counter. func (eb *EventBus) UnsubscribeChannel(topic string, ch EventChannel) { eb.mu.Lock() defer eb.mu.Unlock() if chans, ok := eb.subscribers[topic]; ok { newChans := make(eventChannelSlice, 0, len(chans)) removed := false for _, c := range chans { if c == ch && !removed { removed = true continue } newChans = append(newChans, c) } if removed { if len(newChans) == 0 { delete(eb.subscribers, topic) } else { eb.subscribers[topic] = newChans } // decrement subscriber counter once eb.stats.decSubscriberCountByTopic(topic) } } } // SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels. // The callback keeps receiving events until the channel is unsubscribed or closed. // recover() is used so panics in user callback won't kill the goroutine. func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) EventChannel { ch := NewEventChannel() eb.SubscribeChannel(topic, ch) go func(callable CallbackFunc, ch EventChannel) { for evt := range ch { func() { defer func() { if r := recover(); r != nil { // recovered from panic in callback; swallow or log as needed } }() callable(evt.Topic, evt.Data) evt.Done() }() } }(callable, ch) return ch } // HasSubscribers Check if a topic has subscribers. func (eb *EventBus) HasSubscribers(topic string) bool { return len(eb.getSubscribingChannels(topic)) > 0 } // Stats returns the stats map. func (eb *EventBus) Stats() *Stats { return eb.stats }