# Event Bus

> **Platform Reference** — describes implemented, production behavior.

The Event Bus adds fire-and-forget publish/subscribe messaging to Groups. Publishers emit events on colon-delimited topics, and only peers with matching subscriptions receive them. Messages flow directly over WebRTC (full-mesh flood) with WebSocket relay fallback — no coordinator, no leader.

```javascript
// Subscribe to file changes
const unsub = group.subscribe('file:changed', (event) => {
  console.log(`${event.topic}: ${JSON.stringify(event.data)}`)
})

// Publish an event
group.publish('file:changed', { path: '/photos/cat.jpg', action: 'upload' })

// Unsubscribe
unsub()
```

---

## When to Use Event Bus

| Use Event Bus when... | Use [Shared State](group-shared-state.md) when... | Use [WorkerPool](worker-pool.md) when... | Use [Routes](creating-acequia-apps.md#route-registration) when... |
|---|---|---|---|
| Events are ephemeral (happen and are consumed) | All peers need the same persistent data | Work should go to ONE peer | One canonical handler per URL |
| Multiple subscribers should receive the same event | Updates should merge into shared state | You need request/response semantics | You need `fetch()` semantics |
| High-frequency, fire-and-forget messages | Data is small metadata/config/status | Jobs need retry, timeout, progress | Simple request/response is enough |
| You need topic-based filtering | You need last-write-wins convergence | You need load distribution | A stable handler is expected |

---

## Quick Start

```javascript
import acequia from '/acequia.js'

await acequia.acequiaReady()

const group = new acequia.groups.Group('my-group', {
  displayName: 'My App',
})
await group.ready

// Subscribe — returns an unsubscribe function
const unsub = group.subscribe('file:changed', (event) => {
  console.log(`${event.topic}: ${JSON.stringify(event.data)}`)
})

// Publish to all matching subscribers
group.publish('file:changed', { path: '/photos/cat.jpg', action: 'upload' })

// Publish with retain — replayed to new subscribers
group.publish('status:battery', { level: 0.42 }, { retain: true, ttl: 60000 })

// Unsubscribe when done
unsub()
```

---

## Topic Matching

Topics are colon-delimited segments. Subscription patterns support two wildcards:

| Pattern | Matches | Does not match |
|---------|---------|----------------|
| `chat:general` | `chat:general` | `chat:random`, `chat:general:thread` |
| `chat:*` | `chat:general`, `chat:random` | `chat:general:thread` |
| `sensor:**` | `sensor:temp`, `sensor:temp:living-room` | `chat:general` |
| `**` | everything | — |
| `file:*:upload` | `file:photos:upload` | `file:photos:resize` |

- **`*`** matches exactly one segment
- **`**`** matches one or more segments (must be the last segment in the pattern)

**Topic naming convention:** Use colon-delimited segments, noun-first: `sensor:temperature:living-room`, `file:activity:upload`, `cursor:move`. This works naturally with wildcard patterns.

---

## Retained Events

Retained events store the last-known-value for a topic. When a new subscriber matches a retained topic, the event is replayed immediately with `event.retained = true`. Useful for "current state" values like battery level or connection status.

```javascript
// Publisher: retain the latest sensor reading
group.publish('sensor:temperature', { value: 22.5, unit: 'C' }, {
  retain: true,
  ttl: 60000,  // auto-expire after 60 seconds
})

// Subscriber: receives the retained value immediately on subscribe,
// then receives live updates as they arrive
group.subscribe('sensor:**', (event) => {
  if (event.retained) {
    console.log('Cached value:', event.data)
  } else {
    console.log('Live update:', event.data)
  }
})
```

Without a TTL, retained events persist for the lifetime of the publishing peer. With a TTL, they auto-expire and are removed from the retained cache.

---

## API

### subscribe(pattern, callback)

```javascript
const unsub = group.subscribe(pattern, callback)
```

Returns an unsubscribe function. The callback receives an event object:

| Field | Type | Description |
|-------|------|-------------|
| `topic` | `string` | The actual topic published to |
| `data` | `any` | The event payload |
| `peerId` | `string` | Publisher's instanceId |
| `ts` | `number` | Timestamp |
| `local` | `boolean` | `true` if published by this peer |
| `retained` | `boolean` | `true` if replayed from retained cache |

### publish(topic, data, options?)

```javascript
group.publish(topic, data, options?)
```

| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `retain` | `boolean` | `false` | Cache last value for replay to new subscribers |
| `ttl` | `number` | — | Auto-expire retained event after this many ms |

### subscriberCount(topic)

```javascript
const count = group.subscriberCount('file:changed')
```

Returns the number of remote peers with subscriptions matching the given topic.

### subscriptions

```javascript
const patterns = group.subscriptions  // ['file:*', 'cursor:move']
```

Returns an array of this peer's local subscription patterns.

---

## How It Works

**Subscription advertisement:** When a peer subscribes, it broadcasts `eventBus:sub` to all connected peers. Each peer maintains a map of `peerId → Set<pattern>` for remote subscriptions.

**Publish flow:**
1. Peer calls `publish(topic, data)` — local matching subscribers fire synchronously
2. Publisher iterates remote subscriptions — for each peer with a matching pattern, sends the event via WebRTC (or WS relay if WebRTC failed)
3. Receiving peer checks dedup (bounded seen-set of 1000 msgIds), then fires local matching callbacks

**Transport:** WebRTC data channels are preferred (shared with state sync and WorkerPool). On `subscribe()`, the peer proactively attempts `connectToPeer()` to all group members. If WebRTC handshake fails, the peer is added to `_ebWsRelayPeers` and messages route through the discovery server's WebSocket relay.

**Deduplication:** Each event gets a `crypto.randomUUID()` msgId. A bounded LRU set (1000 entries) rejects duplicates. This prevents double-delivery when messages arrive via both WebRTC and WS relay.

---

## Patterns

### Activity Feed

```javascript
// Publisher: announce file operations
group.publish('file:activity:upload', {
  path: '/documents/report.pdf',
  size: '2.4MB',
  user: acequia.getInstanceId().slice(0, 8),
})

// Subscriber: display activity log
group.subscribe('file:activity:**', (event) => {
  const action = event.topic.split(':').slice(2).join(':')
  addToFeed(`${event.data.user} ${action}: ${event.data.path}`)
})
```

### Collaborative Cursors

```javascript
// Each peer publishes cursor position at throttled rate
document.addEventListener('mousemove', throttle((e) => {
  group.publish('cursor:move', { x: e.clientX, y: e.clientY })
}, 50))

// All peers render remote cursors
group.subscribe('cursor:move', (event) => {
  if (!event.local) renderCursor(event.peerId, event.data)
})
```

### App-to-App Pipeline

Multiple apps in the same group can form a processing pipeline:

```javascript
// App 1: Camera capture
group.publish('pipeline:frame', { imageUrl: '/tmp/frame-001.jpg' })

// App 2: Object detection (subscribes to frames, publishes detections)
group.subscribe('pipeline:frame', async (event) => {
  const detections = await detect(event.data.imageUrl)
  group.publish('pipeline:detections', { frame: event.data.imageUrl, objects: detections })
})

// App 3: Dashboard (subscribes to detections)
group.subscribe('pipeline:detections', (event) => {
  updateOverlay(event.data.objects)
})
```

---

## Gotchas

**Best-effort delivery.** Events are fire-and-forget. There is no acknowledgment, no guaranteed delivery, and no ordering guarantee. If a peer is disconnected when an event is published, it will not receive it (unless the event is retained).

**No backpressure.** High-frequency publishers will flood all matching subscribers. Throttle on the publisher side if needed.

**Callbacks are synchronous.** Subscriber callbacks are invoked directly. A slow callback blocks delivery to subsequent subscribers on the same peer. Keep callbacks fast, or offload heavy work to a `setTimeout` or [WorkerPool](worker-pool.md) job.

**Retained events are per-publisher.** Each peer stores its own retained events. When a publisher goes offline, its retained events are lost for future subscribers. Retained events are not a persistence mechanism — use WebDAV for durable state.

For the full set of patterns and gotchas, see [App Patterns & Field Notes](app-patterns-field-notes.md#event-bus--decentralized-pubsub).
