diff --git a/lcars_v3/embed/create_state_db.sql b/lcars_v3/embed/create_state_db.sql index 241d099..be40c56 100644 --- a/lcars_v3/embed/create_state_db.sql +++ b/lcars_v3/embed/create_state_db.sql @@ -2,7 +2,9 @@ CREATE TABLE ship_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, subsystem TEXT NOT NULL, - severity TEXT CHECK(severity IN ('CRITICAL', 'ALERT', 'WARNING', 'NOTICE', 'INFO')) NOT NULL DEFAULT 'INFO', + severity TEXT CHECK( + severity IN ('CRITICAL', 'ALERT', 'WARNING', 'NOTICE', 'INFO') + ) NOT NULL DEFAULT 'INFO', color TEXT NOT NULL, message TEXT NOT NULL ); @@ -11,4 +13,20 @@ CREATE TABLE crew_member ( id INTEGER PRIMARY KEY AUTOINCREMENT, rank TEXT NOT NULL, name TEXT NOT NULL -); \ No newline at end of file +); + +CREATE TABLE roster ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + crew_id INTEGER NOT NULL, + on_duty timestamp, + off_duty timestamp, + FOREIGN KEY (crew_id) REFERENCES crew_member(id) +); + +CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + event_type TEXT NOT NULL, + event_data JSON NOT NULL +); + diff --git a/lcars_v3/eventbus/eventbus.go b/lcars_v3/eventbus/eventbus.go index ed1aa2c..7d13bea 100644 --- a/lcars_v3/eventbus/eventbus.go +++ b/lcars_v3/eventbus/eventbus.go @@ -1,6 +1,5 @@ // modified version, see original: -// https://github.com/dtomasi/go-event-bus/blob/main/event_bus.go - +// https://github.com/dtomasi/go-event-bus/tree/main package eventbus @@ -9,12 +8,108 @@ import ( "sync/atomic" ) -type Rec map[string]interface{} +type Data map[string]interface{} +// SafeCounter is a concurrency safe counter. +type SafeCounter struct { + v *uint64 +} + +// NewSafeCounter creates a new counter. +func NewSafeCounter() *SafeCounter { + return &SafeCounter{ + v: new(uint64), + } +} + +// Value returns the current value. +func (c *SafeCounter) Value() int { + return int(atomic.LoadUint64(c.v)) +} + +// IncBy increments the counter by given delta. +func (c *SafeCounter) IncBy(add uint) { + atomic.AddUint64(c.v, uint64(add)) +} + +// Inc increments the counter by 1. +func (c *SafeCounter) Inc() { + c.IncBy(1) +} + +// DecBy decrements the counter by given delta. +func (c *SafeCounter) DecBy(dec uint) { + atomic.AddUint64(c.v, ^uint64(dec-1)) +} + +// Dec decrements the counter by 1. +func (c *SafeCounter) Dec() { + c.DecBy(1) +} + +type TopicStats struct { + Name string + PublishedCount *SafeCounter + SubscriberCount *SafeCounter +} + +type topicStatsMap map[string]*TopicStats + +type Stats struct { + data topicStatsMap +} + +func newStats() *Stats { + return &Stats{ + data: map[string]*TopicStats{}, + } +} + +func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats { + _, ok := s.data[topicName] + if !ok { + s.data[topicName] = &TopicStats{ + Name: topicName, + PublishedCount: NewSafeCounter(), + SubscriberCount: NewSafeCounter(), + } + } + + return s.data[topicName] +} + +func (s *Stats) incSubscriberCountByTopic(topicName string) { + s.getOrCreateTopicStats(topicName).SubscriberCount.Inc() +} + +func (s *Stats) GetSubscriberCountByTopic(topicName string) int { + return s.getOrCreateTopicStats(topicName).SubscriberCount.Value() +} + +func (s *Stats) incPublishedCountByTopic(topicName string) { + s.getOrCreateTopicStats(topicName).PublishedCount.Inc() +} + +func (s *Stats) GetPublishedCountByTopic(topicName string) int { + return s.getOrCreateTopicStats(topicName).PublishedCount.Value() +} + +func (s *Stats) GetTopicStats() []*TopicStats { + var tStatsSlice []*TopicStats + for _, tStats := range s.data { + tStatsSlice = append(tStatsSlice, tStats) + } + + return tStatsSlice +} + +func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats { + return s.getOrCreateTopicStats(topicName) +} // Event holds topic name and data. type Event struct { - Data Rec + Data Data Topic string wg *sync.WaitGroup } @@ -27,7 +122,7 @@ func (e *Event) Done() { } // CallbackFunc Defines a CallbackFunc. -type CallbackFunc func(topic string, data Rec) +type CallbackFunc func(topic string, data Data) // EventChannel is a channel which can accept an Event. type EventChannel chan Event @@ -116,7 +211,7 @@ func deepMatchRune(str, pattern []rune, simple bool) bool { //nolint:unparam // PublishAsync data to a topic asynchronously // This function returns a bool channel which indicates that all subscribers where called. -func (eb *EventBus) PublishAsync(topic string, data Rec) { +func (eb *EventBus) PublishAsync(topic string, data Data) { eb.doPublish( eb.getSubscribingChannels(topic), Event{ @@ -129,7 +224,7 @@ func (eb *EventBus) PublishAsync(topic string, data Rec) { } // PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once. -func (eb *EventBus) PublishAsyncOnce(topic string, data Rec) { +func (eb *EventBus) PublishAsyncOnce(topic string, data Data) { if eb.stats.GetPublishedCountByTopic(topic) > 0 { return } @@ -139,7 +234,7 @@ func (eb *EventBus) PublishAsyncOnce(topic string, data Rec) { // Publish data to a topic and wait for all subscribers to finish // This function creates a waitGroup internally. All subscribers must call Done() function on Event. -func (eb *EventBus) Publish(topic string, data Rec) Rec { +func (eb *EventBus) Publish(topic string, data Data) interface{} { wg := sync.WaitGroup{} channels := eb.getSubscribingChannels(topic) wg.Add(len(channels)) @@ -158,7 +253,7 @@ func (eb *EventBus) Publish(topic string, data Rec) Rec { } // PublishOnce same as Publish but makes sure only published once on topic. -func (eb *EventBus) PublishOnce(topic string, data Rec) Rec { +func (eb *EventBus) PublishOnce(topic string, data Data) interface{} { if eb.stats.GetPublishedCountByTopic(topic) > 0 { return nil } @@ -213,100 +308,3 @@ func (eb *EventBus) HasSubscribers(topic string) bool { func (eb *EventBus) Stats() *Stats { return eb.stats } - -type TopicStats struct { - Name string - PublishedCount *SafeCounter - SubscriberCount *SafeCounter -} - -type topicStatsMap map[string]*TopicStats - -type Stats struct { - data topicStatsMap -} - -func newStats() *Stats { - return &Stats{ - data: map[string]*TopicStats{}, - } -} - -func (s *Stats) getOrCreateTopicStats(topicName string) *TopicStats { - _, ok := s.data[topicName] - if !ok { - s.data[topicName] = &TopicStats{ - Name: topicName, - PublishedCount: NewSafeCounter(), - SubscriberCount: NewSafeCounter(), - } - } - - return s.data[topicName] -} - -func (s *Stats) incSubscriberCountByTopic(topicName string) { - s.getOrCreateTopicStats(topicName).SubscriberCount.Inc() -} - -func (s *Stats) GetSubscriberCountByTopic(topicName string) int { - return s.getOrCreateTopicStats(topicName).SubscriberCount.Value() -} - -func (s *Stats) incPublishedCountByTopic(topicName string) { - s.getOrCreateTopicStats(topicName).PublishedCount.Inc() -} - -func (s *Stats) GetPublishedCountByTopic(topicName string) int { - return s.getOrCreateTopicStats(topicName).PublishedCount.Value() -} - -func (s *Stats) GetTopicStats() []*TopicStats { - var tStatsSlice []*TopicStats - for _, tStats := range s.data { - tStatsSlice = append(tStatsSlice, tStats) - } - - return tStatsSlice -} - -func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats { - return s.getOrCreateTopicStats(topicName) -} - -// SafeCounter is a concurrency safe counter. -type SafeCounter struct { - v *uint64 -} - -// NewSafeCounter creates a new counter. -func NewSafeCounter() *SafeCounter { - return &SafeCounter{ - v: new(uint64), - } -} - -// Value returns the current value. -func (c *SafeCounter) Value() int { - return int(atomic.LoadUint64(c.v)) -} - -// IncBy increments the counter by given delta. -func (c *SafeCounter) IncBy(add uint) { - atomic.AddUint64(c.v, uint64(add)) -} - -// Inc increments the counter by 1. -func (c *SafeCounter) Inc() { - c.IncBy(1) -} - -// DecBy decrements the counter by given delta. -func (c *SafeCounter) DecBy(dec uint) { - atomic.AddUint64(c.v, ^uint64(dec-1)) -} - -// Dec decrements the counter by 1. -func (c *SafeCounter) Dec() { - c.DecBy(1) -} diff --git a/lcars_v3/go.mod b/lcars_v3/go.mod index abb8490..5a773da 100644 --- a/lcars_v3/go.mod +++ b/lcars_v3/go.mod @@ -10,7 +10,6 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/starfederation/datastar-go v1.0.2 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/sys v0.34.0 // indirect modernc.org/libc v1.66.3 // indirect diff --git a/lcars_v3/go.sum b/lcars_v3/go.sum index 5aa3bbb..c46fb2d 100644 --- a/lcars_v3/go.sum +++ b/lcars_v3/go.sum @@ -10,8 +10,6 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/starfederation/datastar-go v1.0.2 h1:DrIqBX5jx3nioYwe9mCbtTT/CvJLosFrYbaqaEqfiGY= -github.com/starfederation/datastar-go v1.0.2/go.mod h1:stm83LQkhZkwa5GzzdPEN6dLuu8FVwxIv0w1DYkbD3w= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= diff --git a/lcars_v3/main.go b/lcars_v3/main.go index 99668a2..e06396c 100644 --- a/lcars_v3/main.go +++ b/lcars_v3/main.go @@ -51,6 +51,10 @@ func main() { fmt.Println("------------------") }, 10) + interval.SetInterval(func() { + ebus.Publish("foo:baz", eventbus.Data{"value": "Hallo Welt"}) + }, 100) + runEventbus(ebus) run() @@ -99,12 +103,12 @@ func runEventbus(ebus *eventbus.EventBus) { for i := 0; i < 1000; i++ { - ebus.Publish("foo:baz", eventbus.Rec{"value": i}) - ebus.Publish("pups-klo", eventbus.Rec{"value": i}) - ebus.Publish("pups-klo", eventbus.Rec{"value": i}) - ebus.Publish("ömmels", eventbus.Rec{"value": i}) - ebus.Publish("ömmels", eventbus.Rec{"value": i * 10}) - ebus.Publish("ömmels", eventbus.Rec{"value": i * 100}) + ebus.Publish("foo:baz", eventbus.Data{"value": i}) + ebus.Publish("pups-klo", eventbus.Data{"value": i}) + ebus.Publish("pups-klo", eventbus.Data{"value": i}) + ebus.Publish("ömmels", eventbus.Data{"value": i}) + ebus.Publish("ömmels", eventbus.Data{"value": i * 10}) + ebus.Publish("ömmels", eventbus.Data{"value": i * 100}) } } diff --git a/lcars_v3/sqlite/database.go b/lcars_v3/sqlite/database.go index f3c4b83..7e4aace 100644 --- a/lcars_v3/sqlite/database.go +++ b/lcars_v3/sqlite/database.go @@ -31,6 +31,14 @@ func New(DBName string) (*Database, error) { } func (d *Database) Close() error { + query := ` + PRAGMA analysis_limit = 400; + PRAGMA optimize; + ` + _, err := d.DB().Exec(query) + if err != nil { + return err + } return d.database.Close() } @@ -69,10 +77,14 @@ func openSqliteDB(databasefilename string) (*sql.DB, error) { func createDB(dbfileName string) (*sql.DB, error) { query := ` - PRAGMA page_size = 4096; PRAGMA synchronous = off; - PRAGMA foreign_keys = off; + PRAGMA foreign_keys = on; PRAGMA journal_mode = WAL; + PRAGMA busy_timeout = 5000; + PRAGMA cache_size = 2000; + PRAGMA temp_store = memory; + PRAGMA mmap_size = 30000000000; + PRAGMA page_size = 4096; PRAGMA user_version = 1; ` db, err := sql.Open("sqlite", dbfileName) diff --git a/lcars_v3/state/events.go b/lcars_v3/state/events.go new file mode 100644 index 0000000..c3df2a6 --- /dev/null +++ b/lcars_v3/state/events.go @@ -0,0 +1,2 @@ +package state +