2025-11-19 06:44:26 +01:00

391 lines
9.8 KiB
Go

package main
import (
"sync"
"sync/atomic"
)
// DefaultEventChannelBuffer is the buffer size used for new event channels.
// Use a small buffer to avoid accidental blocking in common cases while still
// keeping memory usage modest.
const DefaultEventChannelBuffer = 8
type Data map[string]interface{}
// SafeCounter is a concurrency safe counter.
type SafeCounter struct {
v *uint64
}
// NewSafeCounter creates a new counter.
func NewSafeCounter() *SafeCounter {
return &SafeCounter{
v: new(uint64),
}
}
// Value returns the current value.
func (c *SafeCounter) Value() int {
return int(atomic.LoadUint64(c.v))
}
// IncBy increments the counter by given delta.
func (c *SafeCounter) IncBy(add uint) {
atomic.AddUint64(c.v, uint64(add))
}
// Inc increments the counter by 1.
func (c *SafeCounter) Inc() {
c.IncBy(1)
}
// DecBy decrements the counter by given delta.
func (c *SafeCounter) DecBy(dec uint) {
atomic.AddUint64(c.v, ^uint64(dec-1))
}
// Dec decrements the counter by 1.
func (c *SafeCounter) Dec() {
c.DecBy(1)
}
type TopicStats struct {
Name string
PublishedCount *SafeCounter
SubscriberCount *SafeCounter
}
type topicStatsMap map[string]*TopicStats
// Stats is concurrency-safe.
type Stats struct {
mu sync.RWMutex
data topicStatsMap
}
func newStats() *Stats {
return &Stats{
data: map[string]*TopicStats{},
}
}
func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats {
s.mu.Lock()
defer s.mu.Unlock()
if ts, ok := s.data[topicName]; ok {
return ts
}
ts := &TopicStats{
Name: topicName,
PublishedCount: NewSafeCounter(),
SubscriberCount: NewSafeCounter(),
}
s.data[topicName] = ts
return ts
}
func (s *Stats) incSubscriberCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).SubscriberCount.Inc()
}
func (s *Stats) decSubscriberCountByTopic(topicName string) {
// ensure topic exists, then decrement
s.getOrCreateTopicStats(topicName).SubscriberCount.Dec()
}
func (s *Stats) GetSubscriberCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).SubscriberCount.Value()
}
func (s *Stats) incPublishedCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).PublishedCount.Inc()
}
func (s *Stats) GetPublishedCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).PublishedCount.Value()
}
func (s *Stats) GetTopicStats() []*TopicStats {
s.mu.RLock()
defer s.mu.RUnlock()
tStatsSlice := make([]*TopicStats, 0, len(s.data))
for _, tStats := range s.data {
tStatsSlice = append(tStatsSlice, tStats)
}
return tStatsSlice
}
func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats {
return s.getOrCreateTopicStats(topicName)
}
// Event holds topic name and data.
type Event struct {
Data Data
Topic string
wg *sync.WaitGroup
}
// Done calls Done on sync.WaitGroup if set.
func (e *Event) Done() {
if e.wg != nil {
e.wg.Done()
}
}
// CallbackFunc Defines a CallbackFunc.
type CallbackFunc func(topic string, data Data)
// EventChannel is a channel which can accept an Event.
type EventChannel chan Event
// NewEventChannel Creates a new EventChannel with default buffer.
func NewEventChannel() EventChannel {
return make(EventChannel, DefaultEventChannelBuffer)
}
// NewUnbufferedEventChannel creates an unbuffered channel (exposed if caller wants it).
func NewUnbufferedEventChannel() EventChannel {
return make(EventChannel)
}
// eventChannelSlice is a slice of EventChannels.
type eventChannelSlice []EventChannel
// EventBus stores the information about subscribers interested for a particular topic.
type EventBus struct {
mu sync.RWMutex
subscribers map[string]eventChannelSlice
stats *Stats
}
// NewEventBus returns a new EventBus instance.
func NewEventBus() *EventBus {
return &EventBus{ //nolint:exhaustivestruct
subscribers: map[string]eventChannelSlice{},
stats: newStats(),
}
}
// getSubscribingChannels returns all subscribing channels including wildcard matches.
func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice {
eb.mu.RLock()
defer eb.mu.RUnlock()
subChannels := eventChannelSlice{}
for topicName, chans := range eb.subscribers {
if topicName == topic || matchWildcard(topicName, topic) {
// append clone to avoid races on the underlying slice
subChannels = append(subChannels, chans...)
}
}
return subChannels
}
// doPublish sends the event to each channel synchronously (blocking sends).
// This is used by Publish (synchronous) so the WaitGroup semantics are preserved.
func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) {
for _, ch := range channels {
ch <- evt // blocking send; Publish relies on this behavior
}
}
// doPublishAsync tries to send to each channel without blocking — dropped sends are ignored.
// This is used by PublishAsync to avoid goroutine leaks when subscribers are slow/missing.
func (eb *EventBus) doPublishAsync(channels eventChannelSlice, evt Event) {
for _, ch := range channels {
select {
case ch <- evt:
// delivered
default:
// subscriber not ready; drop the event for this subscriber
// optionally: count drops or log
}
}
}
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
func matchWildcard(pattern, name string) bool {
if pattern == "" {
return name == pattern
}
if pattern == "*" {
return true
}
// Does only wildcard '*' match.
return deepMatchRune([]rune(name), []rune(pattern), true)
}
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam
for len(pattern) > 0 {
switch pattern[0] {
default:
if len(str) == 0 || str[0] != pattern[0] {
return false
}
case '*':
return deepMatchRune(str, pattern[1:], simple) ||
(len(str) > 0 && deepMatchRune(str[1:], pattern, simple))
}
str = str[1:]
pattern = pattern[1:]
}
return len(str) == 0 && len(pattern) == 0
}
// PublishAsync data to a topic asynchronously.
// This will try to deliver events without blocking; slow/missing subscribers may miss events.
func (eb *EventBus) PublishAsync(topic string, data Data) {
channels := eb.getSubscribingChannels(topic)
evt := Event{
Data: data,
Topic: topic,
wg: nil,
}
// run async non-blocking publisher in a goroutine so caller isn't blocked
go eb.doPublishAsync(channels, evt)
eb.stats.incPublishedCountByTopic(topic)
}
// PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once.
func (eb *EventBus) PublishAsyncOnce(topic string, data Data) {
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
return
}
eb.PublishAsync(topic, data)
}
// Publish data to a topic and wait for all subscribers to finish
// This function creates a waitGroup internally. All subscribers must call Done() function on Event.
func (eb *EventBus) Publish(topic string, data Data) interface{} {
channels := eb.getSubscribingChannels(topic)
wg := sync.WaitGroup{}
wg.Add(len(channels))
evt := Event{
Data: data,
Topic: topic,
wg: &wg,
}
// synchronous blocking publish: callers wait until subscribers call Done()
eb.doPublish(channels, evt)
wg.Wait()
eb.stats.incPublishedCountByTopic(topic)
return data
}
// PublishOnce same as Publish but makes sure only published once on topic.
func (eb *EventBus) PublishOnce(topic string, data Data) interface{} {
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
return nil
}
return eb.Publish(topic, data)
}
// Subscribe to a topic returning a buffered EventChannel (to reduce accidental blocking).
func (eb *EventBus) Subscribe(topic string) EventChannel {
ch := NewEventChannel()
eb.SubscribeChannel(topic, ch)
return ch
}
// SubscribeChannel subscribes to a given Channel.
func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) {
eb.mu.Lock()
defer eb.mu.Unlock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]EventChannel{}, ch)
}
eb.stats.incSubscriberCountByTopic(topic)
}
// Unsubscribe removes a previously-subscribed channel for a topic.
func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) {
eb.UnsubscribeChannel(topic, ch)
}
// UnsubscribeChannel removes a channel from subscribers and decrements the counter.
func (eb *EventBus) UnsubscribeChannel(topic string, ch EventChannel) {
eb.mu.Lock()
defer eb.mu.Unlock()
if chans, ok := eb.subscribers[topic]; ok {
newChans := make(eventChannelSlice, 0, len(chans))
removed := false
for _, c := range chans {
if c == ch && !removed {
removed = true
continue
}
newChans = append(newChans, c)
}
if removed {
if len(newChans) == 0 {
delete(eb.subscribers, topic)
} else {
eb.subscribers[topic] = newChans
}
// decrement subscriber counter once
eb.stats.decSubscriberCountByTopic(topic)
}
}
}
// SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels.
// The callback keeps receiving events until the channel is unsubscribed or closed.
// recover() is used so panics in user callback won't kill the goroutine.
func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) EventChannel {
ch := NewEventChannel()
eb.SubscribeChannel(topic, ch)
go func(callable CallbackFunc, ch EventChannel) {
for evt := range ch {
func() {
defer func() {
if r := recover(); r != nil {
// recovered from panic in callback; swallow or log as needed
}
}()
callable(evt.Topic, evt.Data)
evt.Done()
}()
}
}(callable, ch)
return ch
}
// HasSubscribers Check if a topic has subscribers.
func (eb *EventBus) HasSubscribers(topic string) bool {
return len(eb.getSubscribingChannels(topic)) > 0
}
// Stats returns the stats map.
func (eb *EventBus) Stats() *Stats {
return eb.stats
}