some more crud

This commit is contained in:
thomashamburg 2025-11-19 06:44:26 +01:00
parent db7ed91158
commit a833546227
15 changed files with 4092 additions and 56 deletions

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,23 @@
<!DOCTYPE html >
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Hypermedia</title>
<link rel="stylesheet" href="pico.blue.css">
<meta name="color-scheme" content="light dark">
<script type="module" src="datastar.js"></script>
</head>
<body class="container" data-theme="light">
<h1 style="margin-top: 1em; text-align:center">Hypermedia as the Engine of Application State</h1>
<main class="grid" data-theme="dark">
<div id="init" data-init="@get('/init.html')"></div>
</main>
</body>
</html>

View File

@ -0,0 +1,9 @@
<article id="init">
<header>
<h3>I am the init</h3>
</header>
<p> Init record content</p>
<footer id="footer" data-init="console.log('init'); @get('/init2.html')">
<p>Init record footer</p>
</footer>
</article>

View File

@ -0,0 +1,9 @@
<article id="init">
<header>
<h3>I am the init 2</h3>
</header>
<p> Init record content</p>
<footer id="footer" data-init="console.log('init2'); @get('/init.html')">
<p>Init record footer</p>
</footer>
</article>

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,3 @@
// modified version, see original:
// https://github.com/dtomasi/go-event-bus/tree/main
package eventbus package eventbus
import ( import (
@ -8,6 +5,11 @@ import (
"sync/atomic" "sync/atomic"
) )
// DefaultEventChannelBuffer is the buffer size used for new event channels.
// Use a small buffer to avoid accidental blocking in common cases while still
// keeping memory usage modest.
const DefaultEventChannelBuffer = 8
type Data map[string]interface{} type Data map[string]interface{}
// SafeCounter is a concurrency safe counter. // SafeCounter is a concurrency safe counter.
@ -55,7 +57,9 @@ type TopicStats struct {
type topicStatsMap map[string]*TopicStats type topicStatsMap map[string]*TopicStats
// Stats is concurrency-safe.
type Stats struct { type Stats struct {
mu sync.RWMutex
data topicStatsMap data topicStatsMap
} }
@ -66,22 +70,31 @@ func newStats() *Stats {
} }
func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats { func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats {
_, ok := s.data[topicName] s.mu.Lock()
if !ok { defer s.mu.Unlock()
s.data[topicName] = &TopicStats{
if ts, ok := s.data[topicName]; ok {
return ts
}
ts := &TopicStats{
Name: topicName, Name: topicName,
PublishedCount: NewSafeCounter(), PublishedCount: NewSafeCounter(),
SubscriberCount: NewSafeCounter(), SubscriberCount: NewSafeCounter(),
} }
} s.data[topicName] = ts
return ts
return s.data[topicName]
} }
func (s *Stats) incSubscriberCountByTopic(topicName string) { func (s *Stats) incSubscriberCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).SubscriberCount.Inc() s.getOrCreateTopicStats(topicName).SubscriberCount.Inc()
} }
func (s *Stats) decSubscriberCountByTopic(topicName string) {
// ensure topic exists, then decrement
s.getOrCreateTopicStats(topicName).SubscriberCount.Dec()
}
func (s *Stats) GetSubscriberCountByTopic(topicName string) int { func (s *Stats) GetSubscriberCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).SubscriberCount.Value() return s.getOrCreateTopicStats(topicName).SubscriberCount.Value()
} }
@ -95,7 +108,10 @@ func (s *Stats) GetPublishedCountByTopic(topicName string) int {
} }
func (s *Stats) GetTopicStats() []*TopicStats { func (s *Stats) GetTopicStats() []*TopicStats {
var tStatsSlice []*TopicStats s.mu.RLock()
defer s.mu.RUnlock()
tStatsSlice := make([]*TopicStats, 0, len(s.data))
for _, tStats := range s.data { for _, tStats := range s.data {
tStatsSlice = append(tStatsSlice, tStats) tStatsSlice = append(tStatsSlice, tStats)
} }
@ -127,12 +143,17 @@ type CallbackFunc func(topic string, data Data)
// EventChannel is a channel which can accept an Event. // EventChannel is a channel which can accept an Event.
type EventChannel chan Event type EventChannel chan Event
// NewEventChannel Creates a new EventChannel. // NewEventChannel Creates a new EventChannel with default buffer.
func NewEventChannel() EventChannel { func NewEventChannel() EventChannel {
return make(EventChannel, DefaultEventChannelBuffer)
}
// NewUnbufferedEventChannel creates an unbuffered channel (exposed if caller wants it).
func NewUnbufferedEventChannel() EventChannel {
return make(EventChannel) return make(EventChannel)
} }
// dataChannelSlice is a slice of DataChannels. // eventChannelSlice is a slice of EventChannels.
type eventChannelSlice []EventChannel type eventChannelSlice []EventChannel
// EventBus stores the information about subscribers interested for a particular topic. // EventBus stores the information about subscribers interested for a particular topic.
@ -152,27 +173,41 @@ func NewEventBus() *EventBus {
// getSubscribingChannels returns all subscribing channels including wildcard matches. // getSubscribingChannels returns all subscribing channels including wildcard matches.
func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice { func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice {
eb.mu.RLock()
defer eb.mu.RUnlock()
subChannels := eventChannelSlice{} subChannels := eventChannelSlice{}
for topicName := range eb.subscribers { for topicName, chans := range eb.subscribers {
if topicName == topic || matchWildcard(topicName, topic) { if topicName == topic || matchWildcard(topicName, topic) {
subChannels = append(subChannels, eb.subscribers[topicName]...) // append clone to avoid races on the underlying slice
subChannels = append(subChannels, chans...)
} }
} }
return subChannels return subChannels
} }
// doPublish is publishing events to channels internally. // doPublish sends the event to each channel synchronously (blocking sends).
// This is used by Publish (synchronous) so the WaitGroup semantics are preserved.
func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) { func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) {
eb.mu.RLock()
defer eb.mu.RUnlock()
go func(channels eventChannelSlice, evt Event) {
for _, ch := range channels { for _, ch := range channels {
ch <- evt ch <- evt // blocking send; Publish relies on this behavior
}
}
// doPublishAsync tries to send to each channel without blocking — dropped sends are ignored.
// This is used by PublishAsync to avoid goroutine leaks when subscribers are slow/missing.
func (eb *EventBus) doPublishAsync(channels eventChannelSlice, evt Event) {
for _, ch := range channels {
select {
case ch <- evt:
// delivered
default:
// subscriber not ready; drop the event for this subscriber
// optionally: count drops or log
}
} }
}(channels, evt)
} }
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go // Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
@ -209,16 +244,19 @@ func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam
return len(str) == 0 && len(pattern) == 0 return len(str) == 0 && len(pattern) == 0
} }
// PublishAsync data to a topic asynchronously // PublishAsync data to a topic asynchronously.
// This function returns a bool channel which indicates that all subscribers where called. // This will try to deliver events without blocking; slow/missing subscribers may miss events.
func (eb *EventBus) PublishAsync(topic string, data Data) { func (eb *EventBus) PublishAsync(topic string, data Data) {
eb.doPublish( channels := eb.getSubscribingChannels(topic)
eb.getSubscribingChannels(topic),
Event{ evt := Event{
Data: data, Data: data,
Topic: topic, Topic: topic,
wg: nil, wg: nil,
}) }
// run async non-blocking publisher in a goroutine so caller isn't blocked
go eb.doPublishAsync(channels, evt)
eb.stats.incPublishedCountByTopic(topic) eb.stats.incPublishedCountByTopic(topic)
} }
@ -235,16 +273,19 @@ func (eb *EventBus) PublishAsyncOnce(topic string, data Data) {
// Publish data to a topic and wait for all subscribers to finish // Publish data to a topic and wait for all subscribers to finish
// This function creates a waitGroup internally. All subscribers must call Done() function on Event. // This function creates a waitGroup internally. All subscribers must call Done() function on Event.
func (eb *EventBus) Publish(topic string, data Data) interface{} { func (eb *EventBus) Publish(topic string, data Data) interface{} {
wg := sync.WaitGroup{}
channels := eb.getSubscribingChannels(topic) channels := eb.getSubscribingChannels(topic)
wg := sync.WaitGroup{}
wg.Add(len(channels)) wg.Add(len(channels))
eb.doPublish(
channels, evt := Event{
Event{
Data: data, Data: data,
Topic: topic, Topic: topic,
wg: &wg, wg: &wg,
}) }
// synchronous blocking publish: callers wait until subscribers call Done()
eb.doPublish(channels, evt)
wg.Wait() wg.Wait()
eb.stats.incPublishedCountByTopic(topic) eb.stats.incPublishedCountByTopic(topic)
@ -261,13 +302,10 @@ func (eb *EventBus) PublishOnce(topic string, data Data) interface{} {
return eb.Publish(topic, data) return eb.Publish(topic, data)
} }
// Subscribe to a topic passing a EventChannel. // Subscribe to a topic returning a buffered EventChannel (to reduce accidental blocking).
func (eb *EventBus) Subscribe(topic string) EventChannel { func (eb *EventBus) Subscribe(topic string) EventChannel {
ch := make(EventChannel) ch := NewEventChannel()
eb.SubscribeChannel(topic, ch) eb.SubscribeChannel(topic, ch)
eb.stats.incSubscriberCountByTopic(topic)
return ch return ch
} }
@ -285,18 +323,60 @@ func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) {
eb.stats.incSubscriberCountByTopic(topic) eb.stats.incSubscriberCountByTopic(topic)
} }
// Unsubscribe removes a previously-subscribed channel for a topic.
func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) {
eb.UnsubscribeChannel(topic, ch)
}
// UnsubscribeChannel removes a channel from subscribers and decrements the counter.
func (eb *EventBus) UnsubscribeChannel(topic string, ch EventChannel) {
eb.mu.Lock()
defer eb.mu.Unlock()
if chans, ok := eb.subscribers[topic]; ok {
newChans := make(eventChannelSlice, 0, len(chans))
removed := false
for _, c := range chans {
if c == ch && !removed {
removed = true
continue
}
newChans = append(newChans, c)
}
if removed {
if len(newChans) == 0 {
delete(eb.subscribers, topic)
} else {
eb.subscribers[topic] = newChans
}
// decrement subscriber counter once
eb.stats.decSubscriberCountByTopic(topic)
}
}
}
// SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels. // SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels.
func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) { // The callback keeps receiving events until the channel is unsubscribed or closed.
// recover() is used so panics in user callback won't kill the goroutine.
func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) EventChannel {
ch := NewEventChannel() ch := NewEventChannel()
eb.SubscribeChannel(topic, ch) eb.SubscribeChannel(topic, ch)
go func(callable CallbackFunc) { go func(callable CallbackFunc, ch EventChannel) {
evt := <-ch for evt := range ch {
func() {
defer func() {
if r := recover(); r != nil {
// recovered from panic in callback; swallow or log as needed
}
}()
callable(evt.Topic, evt.Data) callable(evt.Topic, evt.Data)
evt.Done() evt.Done()
}(callable) }()
}
}(callable, ch)
eb.stats.incSubscriberCountByTopic(topic) return ch
} }
// HasSubscribers Check if a topic has subscribers. // HasSubscribers Check if a topic has subscribers.

View File

@ -0,0 +1,310 @@
// modified version, see original:
// https://github.com/dtomasi/go-event-bus/tree/main
package eventbus
import (
"sync"
"sync/atomic"
)
type Data map[string]interface{}
// SafeCounter is a concurrency safe counter.
type SafeCounter struct {
v *uint64
}
// NewSafeCounter creates a new counter.
func NewSafeCounter() *SafeCounter {
return &SafeCounter{
v: new(uint64),
}
}
// Value returns the current value.
func (c *SafeCounter) Value() int {
return int(atomic.LoadUint64(c.v))
}
// IncBy increments the counter by given delta.
func (c *SafeCounter) IncBy(add uint) {
atomic.AddUint64(c.v, uint64(add))
}
// Inc increments the counter by 1.
func (c *SafeCounter) Inc() {
c.IncBy(1)
}
// DecBy decrements the counter by given delta.
func (c *SafeCounter) DecBy(dec uint) {
atomic.AddUint64(c.v, ^uint64(dec-1))
}
// Dec decrements the counter by 1.
func (c *SafeCounter) Dec() {
c.DecBy(1)
}
type TopicStats struct {
Name string
PublishedCount *SafeCounter
SubscriberCount *SafeCounter
}
type topicStatsMap map[string]*TopicStats
type Stats struct {
data topicStatsMap
}
func newStats() *Stats {
return &Stats{
data: map[string]*TopicStats{},
}
}
func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats {
_, ok := s.data[topicName]
if !ok {
s.data[topicName] = &TopicStats{
Name: topicName,
PublishedCount: NewSafeCounter(),
SubscriberCount: NewSafeCounter(),
}
}
return s.data[topicName]
}
func (s *Stats) incSubscriberCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).SubscriberCount.Inc()
}
func (s *Stats) GetSubscriberCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).SubscriberCount.Value()
}
func (s *Stats) incPublishedCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).PublishedCount.Inc()
}
func (s *Stats) GetPublishedCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).PublishedCount.Value()
}
func (s *Stats) GetTopicStats() []*TopicStats {
var tStatsSlice []*TopicStats
for _, tStats := range s.data {
tStatsSlice = append(tStatsSlice, tStats)
}
return tStatsSlice
}
func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats {
return s.getOrCreateTopicStats(topicName)
}
// Event holds topic name and data.
type Event struct {
Data Data
Topic string
wg *sync.WaitGroup
}
// Done calls Done on sync.WaitGroup if set.
func (e *Event) Done() {
if e.wg != nil {
e.wg.Done()
}
}
// CallbackFunc Defines a CallbackFunc.
type CallbackFunc func(topic string, data Data)
// EventChannel is a channel which can accept an Event.
type EventChannel chan Event
// NewEventChannel Creates a new EventChannel.
func NewEventChannel() EventChannel {
return make(EventChannel)
}
// dataChannelSlice is a slice of DataChannels.
type eventChannelSlice []EventChannel
// EventBus stores the information about subscribers interested for a particular topic.
type EventBus struct {
mu sync.RWMutex
subscribers map[string]eventChannelSlice
stats *Stats
}
// NewEventBus returns a new EventBus instance.
func NewEventBus() *EventBus {
return &EventBus{ //nolint:exhaustivestruct
subscribers: map[string]eventChannelSlice{},
stats: newStats(),
}
}
// getSubscribingChannels returns all subscribing channels including wildcard matches.
func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice {
subChannels := eventChannelSlice{}
for topicName := range eb.subscribers {
if topicName == topic || matchWildcard(topicName, topic) {
subChannels = append(subChannels, eb.subscribers[topicName]...)
}
}
return subChannels
}
// doPublish is publishing events to channels internally.
func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) {
eb.mu.RLock()
defer eb.mu.RUnlock()
go func(channels eventChannelSlice, evt Event) {
for _, ch := range channels {
ch <- evt
}
}(channels, evt)
}
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
func matchWildcard(pattern, name string) bool {
if pattern == "" {
return name == pattern
}
if pattern == "*" {
return true
}
// Does only wildcard '*' match.
return deepMatchRune([]rune(name), []rune(pattern), true)
}
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam
for len(pattern) > 0 {
switch pattern[0] {
default:
if len(str) == 0 || str[0] != pattern[0] {
return false
}
case '*':
return deepMatchRune(str, pattern[1:], simple) ||
(len(str) > 0 && deepMatchRune(str[1:], pattern, simple))
}
str = str[1:]
pattern = pattern[1:]
}
return len(str) == 0 && len(pattern) == 0
}
// PublishAsync data to a topic asynchronously
// This function returns a bool channel which indicates that all subscribers where called.
func (eb *EventBus) PublishAsync(topic string, data Data) {
eb.doPublish(
eb.getSubscribingChannels(topic),
Event{
Data: data,
Topic: topic,
wg: nil,
})
eb.stats.incPublishedCountByTopic(topic)
}
// PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once.
func (eb *EventBus) PublishAsyncOnce(topic string, data Data) {
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
return
}
eb.PublishAsync(topic, data)
}
// Publish data to a topic and wait for all subscribers to finish
// This function creates a waitGroup internally. All subscribers must call Done() function on Event.
func (eb *EventBus) Publish(topic string, data Data) interface{} {
wg := sync.WaitGroup{}
channels := eb.getSubscribingChannels(topic)
wg.Add(len(channels))
eb.doPublish(
channels,
Event{
Data: data,
Topic: topic,
wg: &wg,
})
wg.Wait()
eb.stats.incPublishedCountByTopic(topic)
return data
}
// PublishOnce same as Publish but makes sure only published once on topic.
func (eb *EventBus) PublishOnce(topic string, data Data) interface{} {
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
return nil
}
return eb.Publish(topic, data)
}
// Subscribe to a topic passing a EventChannel.
func (eb *EventBus) Subscribe(topic string) EventChannel {
ch := make(EventChannel)
eb.SubscribeChannel(topic, ch)
eb.stats.incSubscriberCountByTopic(topic)
return ch
}
// SubscribeChannel subscribes to a given Channel.
func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) {
eb.mu.Lock()
defer eb.mu.Unlock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]EventChannel{}, ch)
}
eb.stats.incSubscriberCountByTopic(topic)
}
// SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels.
func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) {
ch := NewEventChannel()
eb.SubscribeChannel(topic, ch)
go func(callable CallbackFunc) {
evt := <-ch
callable(evt.Topic, evt.Data)
evt.Done()
}(callable)
eb.stats.incSubscriberCountByTopic(topic)
}
// HasSubscribers Check if a topic has subscribers.
func (eb *EventBus) HasSubscribers(topic string) bool {
return len(eb.getSubscribingChannels(topic)) > 0
}
// Stats returns the stats map.
func (eb *EventBus) Stats() *Stats {
return eb.stats
}

View File

@ -0,0 +1,390 @@
package main
import (
"sync"
"sync/atomic"
)
// DefaultEventChannelBuffer is the buffer size used for new event channels.
// Use a small buffer to avoid accidental blocking in common cases while still
// keeping memory usage modest.
const DefaultEventChannelBuffer = 8
type Data map[string]interface{}
// SafeCounter is a concurrency safe counter.
type SafeCounter struct {
v *uint64
}
// NewSafeCounter creates a new counter.
func NewSafeCounter() *SafeCounter {
return &SafeCounter{
v: new(uint64),
}
}
// Value returns the current value.
func (c *SafeCounter) Value() int {
return int(atomic.LoadUint64(c.v))
}
// IncBy increments the counter by given delta.
func (c *SafeCounter) IncBy(add uint) {
atomic.AddUint64(c.v, uint64(add))
}
// Inc increments the counter by 1.
func (c *SafeCounter) Inc() {
c.IncBy(1)
}
// DecBy decrements the counter by given delta.
func (c *SafeCounter) DecBy(dec uint) {
atomic.AddUint64(c.v, ^uint64(dec-1))
}
// Dec decrements the counter by 1.
func (c *SafeCounter) Dec() {
c.DecBy(1)
}
type TopicStats struct {
Name string
PublishedCount *SafeCounter
SubscriberCount *SafeCounter
}
type topicStatsMap map[string]*TopicStats
// Stats is concurrency-safe.
type Stats struct {
mu sync.RWMutex
data topicStatsMap
}
func newStats() *Stats {
return &Stats{
data: map[string]*TopicStats{},
}
}
func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats {
s.mu.Lock()
defer s.mu.Unlock()
if ts, ok := s.data[topicName]; ok {
return ts
}
ts := &TopicStats{
Name: topicName,
PublishedCount: NewSafeCounter(),
SubscriberCount: NewSafeCounter(),
}
s.data[topicName] = ts
return ts
}
func (s *Stats) incSubscriberCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).SubscriberCount.Inc()
}
func (s *Stats) decSubscriberCountByTopic(topicName string) {
// ensure topic exists, then decrement
s.getOrCreateTopicStats(topicName).SubscriberCount.Dec()
}
func (s *Stats) GetSubscriberCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).SubscriberCount.Value()
}
func (s *Stats) incPublishedCountByTopic(topicName string) {
s.getOrCreateTopicStats(topicName).PublishedCount.Inc()
}
func (s *Stats) GetPublishedCountByTopic(topicName string) int {
return s.getOrCreateTopicStats(topicName).PublishedCount.Value()
}
func (s *Stats) GetTopicStats() []*TopicStats {
s.mu.RLock()
defer s.mu.RUnlock()
tStatsSlice := make([]*TopicStats, 0, len(s.data))
for _, tStats := range s.data {
tStatsSlice = append(tStatsSlice, tStats)
}
return tStatsSlice
}
func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats {
return s.getOrCreateTopicStats(topicName)
}
// Event holds topic name and data.
type Event struct {
Data Data
Topic string
wg *sync.WaitGroup
}
// Done calls Done on sync.WaitGroup if set.
func (e *Event) Done() {
if e.wg != nil {
e.wg.Done()
}
}
// CallbackFunc Defines a CallbackFunc.
type CallbackFunc func(topic string, data Data)
// EventChannel is a channel which can accept an Event.
type EventChannel chan Event
// NewEventChannel Creates a new EventChannel with default buffer.
func NewEventChannel() EventChannel {
return make(EventChannel, DefaultEventChannelBuffer)
}
// NewUnbufferedEventChannel creates an unbuffered channel (exposed if caller wants it).
func NewUnbufferedEventChannel() EventChannel {
return make(EventChannel)
}
// eventChannelSlice is a slice of EventChannels.
type eventChannelSlice []EventChannel
// EventBus stores the information about subscribers interested for a particular topic.
type EventBus struct {
mu sync.RWMutex
subscribers map[string]eventChannelSlice
stats *Stats
}
// NewEventBus returns a new EventBus instance.
func NewEventBus() *EventBus {
return &EventBus{ //nolint:exhaustivestruct
subscribers: map[string]eventChannelSlice{},
stats: newStats(),
}
}
// getSubscribingChannels returns all subscribing channels including wildcard matches.
func (eb *EventBus) getSubscribingChannels(topic string) eventChannelSlice {
eb.mu.RLock()
defer eb.mu.RUnlock()
subChannels := eventChannelSlice{}
for topicName, chans := range eb.subscribers {
if topicName == topic || matchWildcard(topicName, topic) {
// append clone to avoid races on the underlying slice
subChannels = append(subChannels, chans...)
}
}
return subChannels
}
// doPublish sends the event to each channel synchronously (blocking sends).
// This is used by Publish (synchronous) so the WaitGroup semantics are preserved.
func (eb *EventBus) doPublish(channels eventChannelSlice, evt Event) {
for _, ch := range channels {
ch <- evt // blocking send; Publish relies on this behavior
}
}
// doPublishAsync tries to send to each channel without blocking — dropped sends are ignored.
// This is used by PublishAsync to avoid goroutine leaks when subscribers are slow/missing.
func (eb *EventBus) doPublishAsync(channels eventChannelSlice, evt Event) {
for _, ch := range channels {
select {
case ch <- evt:
// delivered
default:
// subscriber not ready; drop the event for this subscriber
// optionally: count drops or log
}
}
}
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
func matchWildcard(pattern, name string) bool {
if pattern == "" {
return name == pattern
}
if pattern == "*" {
return true
}
// Does only wildcard '*' match.
return deepMatchRune([]rune(name), []rune(pattern), true)
}
// Code from https://github.com/minio/minio/blob/master/pkg/wildcard/match.go
func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam
for len(pattern) > 0 {
switch pattern[0] {
default:
if len(str) == 0 || str[0] != pattern[0] {
return false
}
case '*':
return deepMatchRune(str, pattern[1:], simple) ||
(len(str) > 0 && deepMatchRune(str[1:], pattern, simple))
}
str = str[1:]
pattern = pattern[1:]
}
return len(str) == 0 && len(pattern) == 0
}
// PublishAsync data to a topic asynchronously.
// This will try to deliver events without blocking; slow/missing subscribers may miss events.
func (eb *EventBus) PublishAsync(topic string, data Data) {
channels := eb.getSubscribingChannels(topic)
evt := Event{
Data: data,
Topic: topic,
wg: nil,
}
// run async non-blocking publisher in a goroutine so caller isn't blocked
go eb.doPublishAsync(channels, evt)
eb.stats.incPublishedCountByTopic(topic)
}
// PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once.
func (eb *EventBus) PublishAsyncOnce(topic string, data Data) {
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
return
}
eb.PublishAsync(topic, data)
}
// Publish data to a topic and wait for all subscribers to finish
// This function creates a waitGroup internally. All subscribers must call Done() function on Event.
func (eb *EventBus) Publish(topic string, data Data) interface{} {
channels := eb.getSubscribingChannels(topic)
wg := sync.WaitGroup{}
wg.Add(len(channels))
evt := Event{
Data: data,
Topic: topic,
wg: &wg,
}
// synchronous blocking publish: callers wait until subscribers call Done()
eb.doPublish(channels, evt)
wg.Wait()
eb.stats.incPublishedCountByTopic(topic)
return data
}
// PublishOnce same as Publish but makes sure only published once on topic.
func (eb *EventBus) PublishOnce(topic string, data Data) interface{} {
if eb.stats.GetPublishedCountByTopic(topic) > 0 {
return nil
}
return eb.Publish(topic, data)
}
// Subscribe to a topic returning a buffered EventChannel (to reduce accidental blocking).
func (eb *EventBus) Subscribe(topic string) EventChannel {
ch := NewEventChannel()
eb.SubscribeChannel(topic, ch)
return ch
}
// SubscribeChannel subscribes to a given Channel.
func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) {
eb.mu.Lock()
defer eb.mu.Unlock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]EventChannel{}, ch)
}
eb.stats.incSubscriberCountByTopic(topic)
}
// Unsubscribe removes a previously-subscribed channel for a topic.
func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) {
eb.UnsubscribeChannel(topic, ch)
}
// UnsubscribeChannel removes a channel from subscribers and decrements the counter.
func (eb *EventBus) UnsubscribeChannel(topic string, ch EventChannel) {
eb.mu.Lock()
defer eb.mu.Unlock()
if chans, ok := eb.subscribers[topic]; ok {
newChans := make(eventChannelSlice, 0, len(chans))
removed := false
for _, c := range chans {
if c == ch && !removed {
removed = true
continue
}
newChans = append(newChans, c)
}
if removed {
if len(newChans) == 0 {
delete(eb.subscribers, topic)
} else {
eb.subscribers[topic] = newChans
}
// decrement subscriber counter once
eb.stats.decSubscriberCountByTopic(topic)
}
}
}
// SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels.
// The callback keeps receiving events until the channel is unsubscribed or closed.
// recover() is used so panics in user callback won't kill the goroutine.
func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) EventChannel {
ch := NewEventChannel()
eb.SubscribeChannel(topic, ch)
go func(callable CallbackFunc, ch EventChannel) {
for evt := range ch {
func() {
defer func() {
if r := recover(); r != nil {
// recovered from panic in callback; swallow or log as needed
}
}()
callable(evt.Topic, evt.Data)
evt.Done()
}()
}
}(callable, ch)
return ch
}
// HasSubscribers Check if a topic has subscribers.
func (eb *EventBus) HasSubscribers(topic string) bool {
return len(eb.getSubscribingChannels(topic)) > 0
}
// Stats returns the stats map.
func (eb *EventBus) Stats() *Stats {
return eb.stats
}

View File

@ -0,0 +1,79 @@
package main
import (
"fmt"
"time"
)
func main() {
eb := NewEventBus()
fmt.Println("=== EventBus Demo ===")
// Subscribe to specific topic
ch1 := eb.Subscribe("orders.created")
// Subscribe to wildcard
ch2 := eb.Subscribe("orders.*")
// Subscribe using a callback
eb.SubscribeCallback("payments.*", func(topic string, data Data) {
fmt.Printf("[Callback] Payment event received: %s -> %+v\n", topic, data)
})
// Run listener goroutines
go func() {
for evt := range ch1 {
fmt.Printf("[Listener 1] Topic=%s Data=%v\n", evt.Topic, evt.Data)
time.Sleep(50 * time.Millisecond) // simulate work
evt.Done()
}
}()
go func() {
for evt := range ch2 {
fmt.Printf("[Listener 2] Wildcard match for %s Data=%v\n", evt.Topic, evt.Data)
evt.Done()
}
}()
// ---- PUBLISH SYNCHRONOUS EVENTS ----
fmt.Println("\n--- Publish Sync ---")
eb.Publish("orders.created", Data{"id": 42, "customer": "Alice"})
eb.Publish("orders.updated", Data{"id": 42, "status": "shipped"})
// ---- PUBLISH ASYNC EVENTS ----
fmt.Println("\n--- Publish Async ---")
eb.PublishAsync("payments.received", Data{"id": 1001, "amount": 99.50})
eb.PublishAsync("payments.refunded", Data{"id": 1002, "amount": 20.00})
// Give async events time to deliver
time.Sleep(200 * time.Millisecond)
// ---- PUBLISH ONCE semantics ----
fmt.Println("\n--- Publish Once ---")
eb.PublishOnce("orders.once", Data{"msg": "first time"})
eb.PublishOnce("orders.once", Data{"msg": "second time (ignored)"})
eb.PublishAsyncOnce("payments.once", Data{"msg": "async first"})
eb.PublishAsyncOnce("payments.once", Data{"msg": "async second (ignored)"})
time.Sleep(100 * time.Millisecond)
// ---- UNSUBSCRIBE ----
fmt.Println("\n--- Unsubscribe Listener 2 (orders.*) ---")
eb.Unsubscribe("orders.*", ch2)
close(ch2)
eb.Publish("orders.updated", Data{"id": 42, "status": "delivered"})
time.Sleep(100 * time.Millisecond)
// ---- PRINT STATS ----
fmt.Println("\n--- Stats ---")
for _, ts := range eb.Stats().GetTopicStats() {
fmt.Printf("Topic: %-20s | Published: %-3d | Subscribers: %-3d\n",
ts.Name, ts.PublishedCount.Value(), ts.SubscriberCount.Value())
}
fmt.Println("\n=== Done ===")
}

View File

@ -2,9 +2,15 @@ module ld
go 1.25.0 go 1.25.0
require modernc.org/sqlite v1.39.0 require (
github.com/CAFxX/httpcompression v0.0.9
github.com/klauspost/compress v1.18.1
github.com/valyala/bytebufferpool v1.0.0
modernc.org/sqlite v1.39.0
)
require ( require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect

View File

@ -1,15 +1,44 @@
github.com/CAFxX/httpcompression v0.0.9 h1:0ue2X8dOLEpxTm8tt+OdHcgA+gbDge0OqFQWGKSqgrg=
github.com/CAFxX/httpcompression v0.0.9/go.mod h1:XX8oPZA+4IDcfZ0A71Hz0mZsv/YJOgYygkFhizVPilM=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k=
github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/gozstd v1.20.1 h1:xPnnnvjmaDDitMFfDxmQ4vpx0+3CdTg2o3lALvXTU/g=
github.com/valyala/gozstd v1.20.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
@ -21,6 +50,10 @@ golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM= modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM=
modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU=

View File

@ -66,7 +66,7 @@ func run() error {
ctx := context.Background() ctx := context.Background()
stateDB, err := createStateDB(true) stateDB, err := createStateDB(ctx)
if err != nil { if err != nil {
log.Fatalf("Failed to create internal StateDB: %v", err) log.Fatalf("Failed to create internal StateDB: %v", err)
} }

View File

@ -26,6 +26,7 @@ type Server struct {
} }
func New( func New(
ctx context.Context,
stateDB *sqlite.Database, stateDB *sqlite.Database,
embedded embed.FS, embedded embed.FS,
ebus *eventbus.EventBus, ebus *eventbus.EventBus,
@ -34,6 +35,7 @@ func New(
// creating the server // creating the server
return &Server{ return &Server{
Ctx: ctx,
StateDB: stateDB, StateDB: stateDB,
Embedded: embedded, Embedded: embedded,
Ebus: ebus, Ebus: ebus,

View File

@ -148,7 +148,7 @@ To embrace the <strong>Creativity Paradox</strong> and stay within our <strong>C
</v-click> </v-click>
<v-click> <v-click>
<p> <div>
The benefits are clear: The benefits are clear:
- Less code → faster performance and fewer bugs - Less code → faster performance and fewer bugs
@ -156,7 +156,7 @@ The benefits are clear:
- Lower cognitive load → simpler to understand and extend - Lower cognitive load → simpler to understand and extend
- Fewer dependencies → more robust and secure - Fewer dependencies → more robust and secure
- Leverage browser specifications and built-in capabilities - Leverage browser specifications and built-in capabilities
</p> </div>
</v-click> </v-click>
<v-click> <v-click>
@ -630,3 +630,281 @@ transition: slide-up
level: 2 level: 2
--- ---
# to be continued ... # to be continued ...
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart LR
%% Publishers
Scheduler["⏰ Scheduler<br/>(periodic tasks)"]
API["🌐 REST API<br/>(user actions)"]
Worker["⚙️ Background Job<br/>(processing tasks)"]
%% EventBus
EventBus["🧭 EventBus"]
%% Subscribers
SSE["📡 SSE Broadcaster<br/>(push to clients)"]
Logger["📝 Audit Logger<br/>(write to SQLite)"]
Cache["🗃 Cache Updater"]
Notifier["🔔 Notification Service"]
%% Client UI
Client["💻 Clients<br/> via SSE"]
%% Flows
Scheduler -->|publishes: task.tick| EventBus
API -->|publishes: ticket.created| EventBus
Worker -->|publishes: task.completed| EventBus
EventBus -->|subscribes: task.*| SSE
EventBus -->|subscribes: ticket.*| SSE
EventBus -->|subscribes: *| Logger
EventBus -->|subscribes: cache.invalidate| Cache
EventBus -->|subscribes: notify.*| Notifier
SSE -->|SSE stream| Client
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart LR
subgraph Publishers
Scheduler["⏰ Scheduler<br/>(periodic tasks)"]
API["🌐 API<br/>(user triggers)"]
Worker["⚙️ Background Job"]
end
EventBus["🧭 EventBus<br/>(pub/sub hub)"]
subgraph Subscribers
SSE["📡 SSE Broadcaster"]
Logger["📝 Audit Logger"]
Notifier["🔔 Notification Service"]
Cache["🗃 Cache Updater"]
end
Client["💻 Clients via SSE"]
Scheduler --> EventBus
API --> EventBus
Worker --> EventBus
EventBus --> SSE
EventBus --> Logger
EventBus --> Notifier
EventBus --> Cache
SSE --> Client
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
sequenceDiagram
participant P as Publisher (Scheduler)
participant EB as EventBus
participant S1 as SSE Broadcaster
participant S2 as Audit Logger
participant UI as Client UI
P->>EB: Publish("task.tick", data)
EB->>S1: Dispatch event
EB->>S2: Dispatch event
S1->>UI: Push via SSE
S2->>S2: Write to SQLite
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
classDiagram
class EventBus {
- mu sync.RWMutex
- subscribers map[string][]EventChannel
- stats *Stats
+ Publish(topic, data)
+ PublishAsync(topic, data)
+ Subscribe(topic) EventChannel
+ SubscribeCallback(topic, cb)
}
class EventChannel {
<<type>> chan Event
}
class Event {
Data map[string]interface
Topic string
wg *sync.WaitGroup
+ Done()
}
class Stats {
- data map[string]TopicStats
+ GetPublishedCountByTopic(t) int
+ GetSubscriberCountByTopic(t) int
}
EventBus --> EventChannel : delivers Events
EventBus --> Stats
Event --> Stats : increments counters
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart TD
Publish["Publish()"]
Mutex["RWMutex Lock"]
Channels["subscribers[topic]"]
GoRoutine["Goroutine for dispatch"]
Sub1["Subscriber #1"]
Sub2["Subscriber #2"]
Publish --> Mutex
Mutex --> Channels
Channels --> GoRoutine
GoRoutine --> Sub1
GoRoutine --> Sub2
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart TB
subgraph Backend
EB["EventBus"]
DB["SQLite Database"]
Scheduler
API
Worker
SSE["SSE Broadcaster"]
Logger
end
subgraph Clients
Browser["Browser UI"]
end
Scheduler --> EB
API --> EB
Worker --> EB
EB --> SSE
EB --> Logger
Logger --> DB
SSE --> Browser
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart LR
Client["🌐 Browser\n(HTML + JS)"]
subgraph Server["🖥 Go Application Server"]
EB["EventBus"]
SSE["SSE Handler"]
API["HTTP API"]
Worker["Background Worker"]
Scheduler["Cron Scheduler"]
DB["SQLite File"]
end
Client <--> SSE
Client <--> API
Scheduler --> Worker
Worker --> EB
API --> EB
EB --> SSE
EB --> DB
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart TD
A["1⃣ Publisher creates event"]
B["2⃣ EventBus receives event"]
C["3⃣ Match subscribers (wildcard + exact)"]
D["4⃣ Dispatch to channels"]
E["5⃣ Subscribers handle event"]
F["6⃣ Optional: push to client via SSE"]
G["7⃣ Stats updated"]
A --> B --> C --> D --> E --> F --> G
```
---
class: default
---
# Eventbus Architecture
<div class="mt-10"/>
```mermaid { scale: 0.68}
flowchart LR
Topic["Incoming Topic: ticket.updated"]
Exact["Exact Match: ticket.updated"]
Wild1["Wildcard: ticket.*"]
Wild2["Wildcard: *"]
Result["Matched Subscribers"]
Topic --> Exact
Topic --> Wild1
Topic --> Wild2
Exact --> Result
Wild1 --> Result
Wild2 --> Result
```