# Architecture Review — EdgeBits Platform

> Scope: Edge Gateway · Edge Core · Edge Event Engine · Edge Buffer · Edge-Sync · Edge Manager · Edge SDK · Edge Manager SDK · Edge UI · Edge Manager UI  
> Reviewed: 2026-05-08 (`dev0` @ befa42e1)  
> Reviewer: Chief Architect

---

## Summary Table

| # | Area | Issue | Severity | Category |
|---|------|-------|----------|----------|
| **Security** | | | | |
| S1 | Edge Manager API | Zero authentication — all routes open, actor hardcoded `"admin"` | P0 | Security |
| S2 | Edge Manager API | API key sent by edge-sync but never validated server-side | P0 | Security |
| S3 | Edge — MQTT Broker | No credentials, no TLS, no ACLs — full UNS visible to any LAN host | P0 | Security |
| S4 | Edge — Ingest endpoint | No auth, no rate limit — any host can flood the UNS | P1 | Security |
| S5 | Edge — Event Engine | No MQTT ACLs: broker access = PLC write access (OT/IT boundary collapse) | P1 | Security |
| S6 | Edge Manager SDK | `ClaimCode` included in all `Node` List/Get responses | P1 | Security |
| S7 | Edge — Config Tree | SMTP password stored plaintext in config tree | P2 | Security |
| S8 | Edge Manager API | Heartbeat spoofing — any caller can inject telemetry for a known `node_id` | P0 | Security |
| **Resilience** | | | | |
| R1 | Edge Manager — store | All Postgres `Exec()` errors silently discarded — 200 OK on data loss | P0 | Resilience |
| R2 | Edge — MQTT | No reconnect loop — broker restart silently kills all subscriptions forever | P1 | Resilience |
| R3 | Edge — Buffer | On TimescaleDB outage: buffer retries but no backpressure to pipeline — unbounded memory growth | P1 | Resilience |
| R4 | Edge-Sync — queue | `break` on first drain failure — healthy items blocked by one bad item | P1 | Resilience |
| R5 | Edge — Config Tree | `delete()` only removes direct children — grandchildren orphaned permanently | P1 | Resilience |
| R6 | Edge — S7 connector | Stale PLC connection returns silent empty list — data gap is invisible | P1 | Resilience |
| R7 | Edge — TimescaleDB | Dedup index creation silently skipped on failure — QoS 1 duplicates allowed | P2 | Resilience |
| **Correctness** | | | | |
| C1 | Edge Manager — scheduler | `HandleDeployPipeline` task is a no-op — asynq provides zero resilience | P1 | Correctness |
| C2 | Edge — Pipeline Engine | Webhook in inline `event_processor` pipeline stage is dead code | P1 | Correctness |
| C3 | Edge — Rules engine | `DeadbandRule` always returns False — `ctx.previous_value` never populated | P1 | Correctness |
| C4 | Edge — Rules engine | `ChainRule` sub_rules never injected at call site — AND/OR composition non-functional | P1 | Correctness |
| C5 | Edge — Rules engine | `ThresholdRule` ignores `operator` field — only min/max, operator stored but unused | P2 | Correctness |
| C6 | Edge Manager — nodes | Direct-mode claim flow broken — `ClaimedAt` set at registration, `Claim()` always returns `already_claimed` | P1 | Correctness |
| C7 | Edge UI — api.ts | `files.upload` bypasses `request()` — upload errors invisible, `r.ok` never checked | P2 | Correctness |
| C8 | Edge Manager UI — useApi | `fetcher` arrow function recreates every render — potential infinite re-render | P2 | Correctness |
| C9 | Go store — JSON | `json.Unmarshal` errors silently swallowed in all `scan*` helpers — corrupted rows served as empty data | P2 | Correctness |
| C10 | Edge — Buffer | Pagination cursor uses `time <=` at boundary — oldest row duplicated across pages | P2 | Correctness |
| **Industrial IoT / OT Safety** | | | | |
| OT1 | Edge — Rules engine | Rules evaluate raw value without checking `Quality` field — Quality.BAD readings trigger actions | P1 | OT Safety |
| OT2 | Edge — SetpointAction | No range or type validation before writing to PLC — any rule value is written unconditionally | P1 | OT Safety |
| OT3 | Edge — S7 connector | Read errors publish `value=0` with Quality.BAD — ThresholdRule ignores quality, triggers spurious alarms | P1 | OT Safety |
| OT4 | Edge — Ingest → MQTT | REST POST → MQTT → PLC write is an unguarded IT-to-OT bridge | P1 | OT Safety |
| OT5 | Edge — Buffer | `_Aggregator` uses `time.time()` wall clock, not envelope timestamp — incorrect aggregation for backfilled data | P2 | OT Safety |
| **Performance** | | | | |
| P1 | Edge — Orchestrator | O(N) SQLite round-trips per pipeline save — one `config.get()` per service | P2 | Performance |
| P2 | Edge Manager — nodes | N+1 `buildLocationPath` query per node in list — 250 queries for 50 nodes × 5-level tree | P2 | Performance |
| P3 | Edge-Sync — pipeline sync | Full pipeline list sent every 30s with no change detection | P2 | Performance |
| P4 | Edge Manager — timeseries | `ARRAY_AGG(value ORDER BY time DESC)[1]` per tag — expensive on high-cardinality chunks | P3 | Performance |
| **SDK Design** | | | | |
| SDK1 | Both SDKs | Double-decode fallback in every `List()` — signals non-deterministic server envelope shape | P2 | SDK Design |
| SDK2 | Both SDKs | No pagination on any list endpoint or SDK method | P2 | SDK Design |
| SDK3 | Edge SDK | No `X-Request-ID` propagation — multi-hop traces broken | P3 | SDK Design |
| **Code Quality** | | | | |
| Q1 | Edge — DB migrations | Single `executescript()` blob, no version table — schema changes require manual ALTER on all nodes | P2 | Code Quality |
| Q2 | Edge Manager — DB migrations | Two SQL files applied at startup, no version tracking, no per-migration transactions | P2 | Code Quality |
| Q3 | Edge UI — api.ts | `Partial<Pipeline>` for create/update — required fields not enforced at compile time | P3 | Code Quality |
| Q4 | Edge UI — file sizes | `pipeline-editor.tsx` 1891 lines, `dashboard.tsx` 1012 lines — untestable, undecomposable | P2 | Code Quality |
| Q5 | Edge UI — silent catch | 20+ `.catch(() => {})` bypass the global `api-error` system | P2 | Code Quality |
| Q6 | Edge Manager UI — page sizes | `pipeline-editor.tsx` 775 lines, `topology.tsx` 480 lines — over 200-line limit | P3 | Code Quality |

---

## Security

### S1 — Edge Manager API: Zero Authentication (P0)

Every route in `edge-manager/api/` is open with no auth middleware. The chi stack in `cmd/server/main.go` is:
```
fasten.RequestID → mw.CORS → fasten.APILogger → mw.Recovery → chimw.RealIP
```
No token validation, no session check. Every mutating handler hardcodes `"admin"` as the audit actor:
```go
// nodes.go, pipelines.go, handlers — throughout
"admin", // FIXME: wire to auth context
```

**Fix**: Add an auth middleware before the route group. For M2M, validate `X-API-Key` against a hashed secret stored in env. JWT for human users (already in roadmap).

---

### S2 / S8 — API Key Ignored + Heartbeat Spoofing (P0)

`sdk/edge-manager/client.go` sets `X-API-Key` on every request, but no middleware reads or validates it. The header is silently accepted and ignored.

`POST /api/v1/heartbeat` (`nodes.go`) accepts `node_id` from the request body and trusts it unconditionally. Any HTTP client that knows a valid `node_id` (trivially enumerable from the open `GET /api/v1/nodes`) can:
- Report fake CPU/memory to mask a compromised node
- Override pipeline stats to suppress drift detection
- Flip drift flags to block legitimate redeployments

**Fix**: Once S1 auth is in place, bind the API key to the `node_id` at registration. Heartbeat must be rejected if `node_id` does not match the authenticated key.

---

### S3 — MQTT Broker: No Auth, No ACLs, No TLS (P0)

`edge/core/transport/event_bus.py` connects to Mosquitto with no credentials. No Compose override enforces authentication. Any process on the same Docker network or factory LAN can:
- Subscribe to `uns/#` — read all sensor data in real time
- Publish fake envelopes to any pipeline topic — inject false PLC readings
- Send `cmd/{service_id}/config/updated` — reconfigure any connector

In an industrial environment where the edge box is on the plant floor LAN, this is a critical OT exposure.

**Fix**: Enable Mosquitto password auth and ACL file. Connectors get per-instance credentials with write ACL on their own topic only. Gateway gets a credential with write on `uns/#` and read on `cmd/#`. TLS between edge-sync and any external MQTT is mandatory.

---

### S4 — Ingest Endpoint: No Auth, No Rate Limit (P1)

`edge/gateway/src/routes/ingest.py`:
```python
@router.post("")
async def ingest_single(payload: IngestPayload, request: Request) -> dict:
```

Registered with comment `# service→service ingest stays open` in `main.py:557`. No authentication, no rate limiting, no source validation. Any process on the Docker network can flood the MQTT bus, cascade into TimescaleDB, and trigger event rules including setpoint writes to PLCs.

**Fix**: At minimum, require `X-Block-Token` validated against the shared secret generated at gateway startup (`app.state.block_token`). Add per-IP rate limiting.

---

### S5 — MQTT Broker Access = PLC Write Access (P1)

`SetpointAction` in `edge/event-engine/actions/builtins.py` publishes to `cmd/{connector_id}/write`. Connectors subscribe to this topic and execute PLC writes. There is no additional authorization between MQTT publish and PLC write.

Combined effect of S3 (no MQTT auth) + S5: any unauthenticated process on the LAN can publish to `cmd/s7-connector/write` and cause a Siemens PLC to execute a setpoint write.

**Fix**: S3 fix (MQTT ACLs) is the primary gate. Additionally, setpoint commands should carry a cryptographic signature that connectors verify before executing.

---

### S6 — ClaimCode Exposed in Node List (P1)

`sdk/edge-manager/resources.go` — `Node.ClaimCode` is part of the serialized struct returned by `GET /api/v1/nodes`. Claim codes for every registered node are visible to any caller with API access.

**Fix**: `json:"-"` on `ClaimCode` in list/get responses. Return it only in the `POST /nodes` registration response body.

---

### S7 — SMTP Password Plaintext in Config Tree (P2)

`edge/event-engine/actions/builtins.py` reads SMTP credentials from the config tree via HTTP:
```python
password = smtp.get("password", "")
```

The config tree is stored in SQLite with no encryption at rest. Anyone with filesystem access to `config.db` reads SMTP credentials in plaintext. In a factory environment where SD cards are physically accessible, this is a meaningful exposure.

**Fix**: Support `SMTP_PASS` via environment variable reference. For values that must be stored, AES-encrypt sensitive config keys at write time or use a secrets backend.

---

## Resilience

### R1 — Postgres Store Silently Discards Write Errors (P0)

Every `Put()` and `Delete()` method in `edge-manager/api/internal/store/postgres.go` discards the error from `pool.Exec()`:

```go
// postgres.go — locations, nodes, templates, pipelines, deployments, audit
s.pool.Exec(c, `INSERT INTO locations ... ON CONFLICT ...`, ...)  // error dropped
s.pool.Exec(c, `DELETE FROM nodes WHERE id = $1`, id)            // error dropped
```

The HTTP handler returns `201 Created` or `200 OK`. The data was never written. This pattern repeats across all six stores. A Postgres connection pool exhaustion, constraint violation, or disk-full silently succeeds from the caller's perspective.

**Fix**: Check every error return. Return HTTP 500 if the write failed. Confirm `rowsAffected > 0` for upserts. One-line fix per call site, massive data integrity impact.

---

### R2 — MQTT Reconnect Not Implemented (P1)

`edge/core/transport/event_bus.py` — `_listen()` runs as a background task. When the MQTT broker restarts, `_listen()` raises; the handler sets `self._connected = False` and exits. There is no reconnect loop:

```python
async def _listen(self) -> None:
    ...
    except Exception as exc:
        log.error("listen loop error: %s", exc)
        self._connected = False  # exits; never re-arms
```

The gateway stays running and responds to health checks, but zero MQTT messages are processed. Pipeline subscriptions are silently dead indefinitely. In a factory environment, Mosquitto restarts are not uncommon (config push, OOM on Pi, power glitch).

**Fix**: Wrap `_listen()` with an exponential-backoff reconnect loop. On reconnect, re-subscribe all active pipeline topics. Health endpoint should report `"degraded"` while MQTT is disconnected.

---

### R3 — Buffer Retries But Has No Backpressure Signal (P1)

`edge/buffer/manager.py` — on TimescaleDB failure the batch is put back at the front of the buffer:
```python
except Exception as exc:
    self._buf = batch + self._buf  # retry: put batch back at front
    log.error("flush_error", ...)
```

The pipeline engine has no visibility into buffer pressure. When TimescaleDB is unavailable for minutes, `self._buf` grows without bound in-process. On a Raspberry Pi with 1GB RAM, sustained outage causes OOM kill — which loses all buffered envelopes and stops the entire edge stack.

**Fix**: Expose a `buffer_pressure()` property (e.g., `len(self._buf) / max_buf`). Pipeline engine checks before writing and emits Quality.DEGRADED instead of ingesting when buffer is at capacity. Log a warning at 50%, error at 80%.

---

### R4 — Queue Drain Stops on First Failure (P1)

`edge/edge-sync/internal/poller/poller.go`:
```go
break // stop draining on first failure
```

Item 3 fails (bad payload), items 4–50 are never attempted this cycle. Items that fail for transient reasons block items with completely different failure modes.

**Fix**: `continue` for items with permanent errors (4xx from Edge Manager — mark as dead-letter). Only `break` on transport failures where all subsequent items will also fail.

---

### R5 — Config Tree Delete Leaks Grandchildren (P1)

`config_service.py`:
```python
DELETE FROM config_tree WHERE path=? OR parent_path=?
```

This removes only direct children. Grandchildren at `services/svc/connection/host` survive because their `parent_path` is `services/svc/connection`, not `services/svc`. After `_stop_and_purge`, orphaned keys accumulate and get picked up as phantom services on the next orchestrator reconcile.

**Fix**: Replace with `WHERE path = ? OR path LIKE ? || '/%'` (prefix sweep). SQLite supports this efficiently with an index on `path`.

---

### R6 — S7 Stale Connection: Silent Empty List (P1)

`blocks/connectors/s7/src/connector.py`:
```python
async def _read_all_tags(self, dev: DeviceConfig) -> list[Envelope]:
    client = self._clients.get(dev.id)
    if not client or not client.get_connected():
        return []  # silent — no log, no reconnect, no Quality.BAD
```

When the snap7 connection goes stale (TCP timeout, PLC restart), `_read_all_tags` returns an empty list silently. From the pipeline perspective, the tag simply stops producing values — indistinguishable from a slow tag with no change. A dead PLC connection looks like normal operation.

**Fix**: On `not client.get_connected()`, attempt reconnect with backoff. If reconnect fails, publish Quality.BAD envelopes for all configured tags. Log `reconnect_failed` with device ID.

---

### R7 — TimescaleDB Dedup Index Silently Skipped (P2)

`edge/core/storage/timescale.py` — if dedup index creation fails, a warning is logged and the table is left unindexed:
```python
except Exception as e:
    log.warning("dedup index creation skipped: %s", e)
```

Subsequent MQTT QoS 1 re-deliveries or edge-sync replays insert duplicate rows silently. Dashboard charts show doubled values.

**Fix**: Fail loud on dedup index creation failure. Do not allow TimescaleManager to mark itself initialized without the index.

---

## Correctness

### C1 — DeployPipeline Asynq Task Is a No-Op (P1)

`edge-manager/api/internal/scheduler/handlers.go`:
```go
_ = node
_ = pipe
// Deploy logic is in services/deployer.go — this handler ensures
// the task survives restarts.
```

The deploy already happened synchronously in the HTTP handler before this task was enqueued. The asynq task provides zero resilience — if Redis restarts between enqueue and execution, the task is lost. The actual deploy has already occurred.

**Fix**: Move the deploy logic into the task handler (making it truly async and restart-safe), or remove the task enqueue entirely and document that deployment is synchronous only.

---

### C2 — Webhook Dead Code in Inline Pipeline Stage (P1)

`edge/gateway/src/services/pipeline_engine.py`:
```python
case "webhook":
    url = action.get("url", "")
    if url:
        log.info("webhook_fire", ...)
        # Phase 2: actual HTTP POST via aiohttp
```

The standalone Event Engine (`edge/event-engine/actions/builtins.py`) correctly implements webhook via `httpx.AsyncClient` — that path is functional. This finding applies only to the `event_processor` stage type embedded inside pipeline definitions. The rule fires. A log line appears. No HTTP call is made. No UI warning exists.

**Fix**: Implement the HTTP call in `pipeline_engine.py`, or remove the `event_processor` stage type from the pipeline editor and direct users to the standalone Event Engine.

---

### C3 — DeadbandRule Always Returns False (P1)

`edge/event-engine/engine.py`:
```python
ctx = RuleContext(
    tag=env.tag, value=env.value,
    timestamp=..., source=env.source,
    # previous_value is never set — defaults to None
)
```

`edge/core/rules/rule_types.py`:
```python
class DeadbandRule:
    def evaluate(self, ctx: RuleContext) -> bool:
        if ctx.previous_value is None:
            return False  # ← always reached
        return abs(ctx.value - ctx.previous_value) > self.cfg.threshold
```

`DeadbandRule` — a core industrial pattern (fire only when value changes significantly) — has never fired. Every configured deadband rule is silently non-functional.

**Fix**: The `EventEngine` must track `previous_value` per `(rule_id, source, tag)` triplet. Populate `ctx.previous_value` from this tracked state before calling `evaluator.evaluate(ctx)`.

---

### C4 — ChainRule Sub-Rules Never Injected (P1)

`edge/event-engine/engine.py`:
```python
self._evaluators[rule.id] = create_builtin_rule(rule.rule_type, config)
```

`create_builtin_rule` passes only `config` — no `sub_rules` kwarg. `ChainRule.__init__` gets `sub_rules={}`. `evaluate()` iterates over `self.cfg.rule_ids` but `self.sub_rules.get(rule_id)` returns None for all — fired count stays 0, AND mode returns True (vacuous AND), OR mode returns False.

`ChainRule.set_sub_rules()` exists but is never called. The AND/OR composite alarm feature is completely non-functional.

**Fix**: `engine.register()` must resolve sub-rule IDs from `rule.rule_config["rule_ids"]` and inject the corresponding evaluators via `chain_rule.set_sub_rules(...)`.

---

### C5 — ThresholdRule `operator` Field Unused (P2)

`ThresholdConfig` exposes `operator: str = "gt"` accepting `gt | lt | gte | lte | eq | neq` in the UI rule editor. `ThresholdRule.evaluate()` ignores it:
```python
if self.cfg.max is not None and v > self.cfg.max:  # always `>`
    return True
if self.cfg.min is not None and v < self.cfg.min:  # always `<`
```

Users configuring `operator=eq` or `operator=lte` get unexpected behavior.

**Fix**: Add operator dispatch using the existing `_OPERATORS` dict in `rule_types.py`.

---

### C6 — Direct-Mode Claim Flow Broken (P1)

`edge-manager/api/internal/handlers/nodes.go`:
```go
ClaimedAt: time.Now(), // direct registration = immediate claim
```

`Claim()` checks `if !found.ClaimedAt.IsZero()` → always returns `"already_claimed"`. The claim code flow is unreachable for direct-mode nodes — they are pre-claimed at registration time.

**Fix**: Do not set `ClaimedAt` at registration. Set it only when `Claim()` is called with a valid claim code, or when the first authenticated heartbeat is received.

---

### C7 — `files.upload` Bypasses Error Handling (P2)

`edge/ui/src/lib/api.ts`:
```ts
return fetch(`${BASE}/files`, { method: 'POST', body: form }).then(r => r.json())
```

1. `r.ok` is never checked — a 413 or 500 is parsed as JSON and returned as "success"
2. Network errors never fire `api-error` — upload failures are invisible
3. Return type is `Promise<any>`, not typed

**Fix**: Move file upload into `request()` or a dedicated helper that checks `res.ok` and dispatches `api-error`.

---

### C8 — `useApi` Infinite Re-render Risk (P2)

`edge-manager/ui/src/hooks/useApi.ts`:
```ts
const load = useCallback(async () => { ... }, [fetcher, opts?.label]);
```

Every caller passes an inline arrow: `useApi(() => nodeService.list())`. Arrow functions recreate on every render. `fetcher` reference changes → `load` recreates → effect re-runs → infinite loop. Safe only if callers wrap their fetcher in `useCallback`, which none currently do.

**Fix**: Accept a stable dependency array from the caller instead of depending on the function reference, or use a ref for the fetcher.

---

### C9 — Go Store JSON Unmarshal Errors Silently Swallowed (P2)

`edge-manager/api/internal/store/postgres.go`:
```go
json.Unmarshal(tagsJSON, &loc.Tags)     // error discarded
json.Unmarshal(metaJSON, &loc.Metadata) // error discarded
```

Corrupted JSON is served as empty fields. Combined with R1 (writes silently fail), data corruption is both written silently and read back silently.

**Fix**: Log the unmarshal error with the row ID and field name. Return an error or a structured degraded response so the caller can detect partial data.

---

### C10 — Buffer Pagination Off-by-One at Cursor Boundary (P2)

`edge/gateway/src/routes/buffer.py` — next page cursor uses the oldest row's timestamp:
```python
next_params = {**base_params, "until": str(oldest)}
```

The query uses `time <= until`, so the boundary row appears in both pages. Two consecutive pages with the same `until` will always duplicate the last row of the previous page.

**Fix**: Use `time < until` (strictly less than), or include a row ID in the cursor.

---

## Industrial IoT / OT Safety

### OT1 — Rules Evaluate Without Checking Quality (P1)

`RuleContext` in `edge/core/rules/rule_types.py` has no quality field:
```python
class RuleContext(BaseModel):
    tag: str
    value: Any
    timestamp: str
    source: str
    previous_value: Any = None
    unchanged_sec: float = 0.0
    # quality: not present
```

A sensor with a broken cable publishes Quality.BAD with `value=0`. `ThresholdRule(min=1.0)` fires. The operator receives an alert and may take incorrect corrective action. In process industries (temperature control, pressure monitoring), false alarms on bad-quality data cause unplanned shutdowns.

**Fix**: Add `quality: str = "good"` to `RuleContext`. Before calling `evaluator.evaluate(ctx)`, check `ctx.quality != "bad"`. Each rule type should explicitly declare whether it fires on degraded quality.

---

### OT2 — SetpointAction Has No Value Validation (P1)

`edge/event-engine/actions/builtins.py` — `SetpointAction` evaluates `value_expression` via AST walker and publishes the result to `cmd/{connector_id}/write`. No range check, no type check, no rate limit on writes.

A misconfigured expression with `value * 100` could write a value 100× the intended setpoint to a PLC output. On a Siemens S7 driving a motor speed controller, this could cause mechanical damage or safety system activation.

**Fix**: `SetpointAction` config must include `min_value`, `max_value`, and `value_type`. The action validates the computed value before publishing. The S7 connector's `write()` should also validate against tag-level limits.

---

### OT3 — S7 Read Error Publishes value=0 with Quality.BAD (P1)

`blocks/connectors/s7/src/connector.py`:
```python
except Exception as exc:
    results.append(Envelope(
        source=f"s7/{dev.id}", tag=tag.id, value=0, unit=tag.unit,
        quality=Quality.BAD, ...
    ))
```

The quality flag is correctly set to BAD. However, since rules do not check quality (OT1), a ThresholdRule configured as `min=5.0` fires on `value=0` from a read error. Additionally, `value=0` is a valid physical reading (flow meter at zero, motor stopped) — distinguishing a legitimate zero from an error zero requires the quality field, which nothing reads.

**Fix**: Fix OT1 first (quality-aware evaluation). Until then, publishing `value=None` would prevent spurious numeric rule evaluation.

---

### OT4 — REST Ingest Is an Unguarded IT-to-OT Bridge (P1)

The data flow `POST /api/v1/ingest → MQTT → pipeline → SetpointAction → PLC write` creates a path from the IT network directly to OT actuators. The ingest endpoint has no authentication (S4), and MQTT has no ACLs (S3). An attacker on the IT LAN can:

1. POST to `/api/v1/ingest` with a crafted envelope on a monitored tag
2. The envelope reaches the event engine as a valid Quality.GOOD reading
3. A ThresholdRule fires and SetpointAction writes to a PLC

In IEC 62443 terms, this collapses Security Level zones by providing an uncontrolled conduit from the enterprise zone into the control zone.

**Fix**: S4 and S3 fixes together address this. Setpoint write commands should additionally require a separate authorization token beyond normal data ingest credentials.

---

### OT5 — Aggregator Uses Wall Clock, Not Envelope Timestamp (P2)

`edge/gateway/src/services/pipeline_engine.py` — `_Aggregator` uses `time.time()` to assign time-bucket keys. When historical data is replayed from a file connector or backfilled from an upstream historian, all envelopes get aggregated into the current real-time bucket, not their original timestamp bucket. Statistical aggregations (rolling averages, rate-of-change) are incorrect for any non-real-time data path.

**Fix**: Use `env.timestamp` for the bucket key. Handle out-of-order delivery with a configurable late-arrival window.

---

## Performance

### P1 — Orchestrator O(N) SQLite Round-Trips Per Pipeline Save (P2)

`edge/gateway/src/services/orchestrator/orchestrator.py` — `_services_for_pipeline()` fetches all service IDs then does one `config_service.get()` per service to read `metadata.pipeline_id`. With 50 services, that's 51 sequential SQLite queries every time a pipeline is saved.

**Fix**:
```sql
SELECT path, value FROM config_tree
WHERE path LIKE 'services/%/metadata'
  AND json_extract(value, '$.pipeline_id') = ?
```

---

### P2 — N+1 Location Path in Node List (P2)

`edge-manager/api/internal/handlers/nodes.go` — `buildLocationPath()` walks the location hierarchy one `Get()` call per level, called for every node in the list. For 50 nodes on a 5-level tree: 250 sequential DB queries before the response is sent.

**Fix**: Single recursive CTE to fetch all location ancestors in one query. Cache the location tree in memory (locations change rarely).

---

### P3 — Pipeline Sync Full Resend Every Cycle (P2)

`edge/edge-sync/internal/poller/poller.go` — every 30 seconds, edge-sync sends the complete local pipeline list with no change detection. Edge Manager deduplicates by `EdgePipelineID` but still executes the write path for each entry.

**Fix**: Track a local hash of the pipeline list (CRC32 of serialized JSON). Only transmit if the hash has changed since the last successful sync.

---

### P4 — ARRAY_AGG on High-Cardinality Chunks (P3)

`edge/core/storage/timescale.py` — `TelemetryRepository.timeseries()` uses:
```sql
ARRAY_AGG(value ORDER BY time DESC)[1]
```
For a 1-hour chunk with high-frequency tags, this materializes the entire ordered array before taking the first element.

**Fix**: Use `LAST(value, time)` from TimescaleDB's hyperfunctions — designed exactly for this pattern.

---

## SDK Design

### SDK1 — Double-Decode Fallback Signals Non-Deterministic Server Shape (P2)

Every `List()` method in both SDKs:
```go
if err := json.Unmarshal(data, &resp); err != nil {
    var items []T
    if err2 := json.Unmarshal(data, &items); err2 != nil {
        return nil, fmt.Errorf("...: %w", err)  // returns err not err2 — wrong error
    }
    return items, nil
}
```

The server sometimes wraps (`{"pipelines": [...]}`) and sometimes returns a bare array. The fallback silently papers over this mismatch. The error on double-decode failure returns `err` (first unmarshal error), not `err2`, so debugging is confusing.

**Fix**: Pick one response shape (wrapped preferred, for extensibility) and enforce it everywhere. Remove the double-decode fallback.

---

### SDK2 — No Pagination on Any List Endpoint (P2)

`NodeService.List`, `TemplateService.List`, `LocationService.List`, `DeploymentService.List`, `PipelineService.List` in both SDKs — all return unbounded lists with no `limit`/`offset`. At fleet scale (500+ nodes), a single list call returns all records in one synchronous HTTP response backed by a `SELECT *` with no LIMIT.

**Fix**: Add cursor or offset pagination to all list endpoints. SDK methods should accept `ListOptions{Limit, Offset}`.

---

### SDK3 — No Request ID Propagation in Edge SDK (P3)

`sdk/edge/client.go` — `do()` does not set or propagate `X-Request-ID`. Multi-hop traces across edge-sync → edge-gateway → event-engine are broken at the SDK boundary.

**Fix**: Accept an optional correlation ID in `NewClient` or per-request context. Edge-sync should propagate the poll-cycle ID for all SDK calls within that cycle.

---

## Code Quality

### Q1 / Q2 — No Database Migration Versioning (P2)

**Edge**: `edge/core/storage/sqlite.py` applies one `executescript()` blob with `CREATE TABLE IF NOT EXISTS`. No schema version table. Adding a column to an existing table requires a manual `ALTER TABLE` on every deployed node with no automated migration path. Errors in the manual `ALTER TABLE` attempts are silently swallowed.

**Edge Manager**: Two SQL files applied in `main.go` startup, no version tracking. Same problem: schema drift across deployments is invisible.

**Fix**: `golang-migrate` for Go, `alembic` for Python (or a manual `schema_migrations(version, applied_at)` table). Each migration is a numbered, idempotent file applied in order.

---

### Q3 — `Partial<Pipeline>` Allows Invalid Creates (P3)

`edge/ui/src/lib/api.ts` — `pipelines.create` and `pipelines.update` accept `Partial<Pipeline>`. Required fields (`source`, `sink`, `name`) are not enforced by TypeScript at the call site.

**Fix**: Define `CreatePipelinePayload` with required fields for create, and `UpdatePipelinePayload` (all optional) for update.

---

### Q4 — Edge UI Page File Sizes (P2)

| File | Lines | Over limit by |
|------|-------|--------------|
| `pipeline-editor.tsx` | 1891 | 9.5× |
| `dashboard.tsx` | 1012 | 5× |
| `config.tsx` | 811 | 4× |
| `buffer.tsx` | 692 | 3.5× |
| `node-setup.tsx` | 653 | 3.3× |
| `pipelines.tsx` | 547 | 2.7× |
| `rules.tsx` | 474 | 2.4× |

`pipeline-editor.tsx` at 1891 lines contains drag-and-drop state, stage configuration panels, source/sink pickers, MQTT topic logic, save/delete flows, and import/export — all in one render function. Cannot be unit tested; generates merge conflicts on any concurrent feature work.

---

### Q5 — Silent `.catch(() => {})` Throughout Edge UI (P2)

20+ instances bypass the global `api-error` system:
```ts
api.rules.list().then(setRules).catch(() => {})
api.system.metrics().then(setMetrics).catch(() => {})
api.buffer.stats().then(setStats).catch(() => {})
```

The `api-error` CustomEvent system in `api.ts` is correctly designed. It is bypassed everywhere it matters. Pages show stale or empty data with no user feedback.

**Fix**: Replace `.catch(() => {})` with `useApi()` or let the promise reject naturally to the global handler.

---

### Q6 — Edge Manager UI Page Sizes (P3)

| File | Lines |
|------|-------|
| `pipeline-editor.tsx` | 775 |
| `topology.tsx` | 480 |
| `nodes.tsx` | 470 |
| `pipeline-wizard.tsx` | 467 |
| `templates.tsx` | 397 |

---

## What Is Done Well

**Repository pattern respected**: Zero raw SQL in route handlers across the entire edge gateway. All DB access goes through typed repositories with parameterized queries.

**AST-safe expression evaluator** (`pipeline_engine.py`, `rule_types.py`): `eval()` replaced with a restricted AST walker. Correct for user-provided filter expressions on industrial data. The operator dispatch table using Python's `operator` module is clean.

**Orchestrator per-pipeline locks**: `asyncio.Lock` per pipeline ID in `_pipeline_locks` prevents concurrent saves from racing. Thoughtful in an async-first codebase.

**Connector/egress microservice architecture**: Separate containers per pipeline instance, MQTT-only communication, copy-by-value config, 2-env-var boot contract (`SERVICE_ID` + `GATEWAY_URL`), orchestrator desired→actual reconcile. Principled design that survives PLC-specific failure isolation.

**fasten audit trail**: Wired consistently on all state-changing operations with typed `AuditCode` constants in a separate SQLite audit DB. Pattern is correct; the `"admin"` FIXME is a known placeholder, not a design flaw.

**Event Engine as separate process**: Now runs in its own container with its own port — correct isolation for a component that makes PLC writes.

**Edge-sync 4-step poll cycle**: Each step is independently error-tolerant. A failed heartbeat doesn't block deploy polling. The offline SQLite queue with exponential backoff is well-implemented for a DMZ-crossing service.

**TimescaleDB dedup index**: `idx_telem_dedup ON telemetry(time, source, tag)` makes MQTT QoS 1 re-delivery idempotent when the index is present (see R7).

**Envelope immutability**: `frozen=True` Pydantic model with `derive()` for lineage tracking. UUID-per-message traceability through the pipeline is correct and industrial-grade.

**Go SDK functional options pattern**: `WithTimeout`, `WithHTTPClient`, `WithAPIKey` are idiomatic. `url.PathEscape()` on all path parameters prevents path traversal.

**Edge Manager UI service layer**: `useApi`/`useMutation` hooks, typed `ApiError` class, service layer separating domain from HTTP concerns. Noticeably more disciplined than the Edge UI's direct `api.*` calls.

**TimescaleDB hot-connect**: Gateway now auto-reconnects to TimescaleDB every 30 s when `_pool is None` — buffer profile can be started after the gateway without requiring a gateway restart.

---

## Prioritised Fix Order

### P0 — Before Any External Deployment

1. **R1** — Fix Postgres store error handling (one-line fix per method, massive data integrity impact)
2. **S1/S2/S8** — Edge Manager auth middleware + API key validation + heartbeat identity binding
3. **S3** — MQTT broker auth, ACLs, and TLS

### P1 — Next Sprint (Core Correctness + OT Safety)

4. **R2** — MQTT reconnect loop in EventBus with health status reporting
5. **OT1** — Quality-aware rule evaluation — block Quality.BAD from triggering actions
6. **C3** — Populate `previous_value` in RuleContext — DeadbandRule is a core industrial pattern
7. **C4** — Wire `sub_rules` into ChainRule at engine registration time
8. **OT2** — SetpointAction range/type validation before PLC write
9. **R6** — S7 stale connection: explicit reconnect + Quality.BAD publication on failure
10. **C1** — Move deploy logic into asynq handler or remove queue
11. **S4** — Ingest endpoint auth + rate limiting
12. **R4** — Queue drain: continue on 4xx, break only on transport failure
13. **R5** — Config tree delete recursive CTE
14. **C6** — Fix direct-mode claim flow (`ClaimedAt` at registration)
15. **C2** — Implement webhook in inline pipeline stage or remove from UI

### P2 — Backlog

16. **C5** — ThresholdRule operator dispatch
17. **OT3** — S7 value=0 on error (after OT1 quality fix)
18. **R3** — Buffer pressure signal to pipeline engine
19. **R7** — TimescaleDB dedup index fail-loud
20. **C10** — Buffer pagination cursor fix
21. **P1/P2** — Orchestrator O(N) query + N+1 location path
22. **SDK1** — Fix server response shape (remove double-decode)
23. **SDK2** — Pagination on all list endpoints
24. **Q1/Q2** — Migration versioning for both products
25. **C7/C8/C9** — File upload, useApi stability, JSON unmarshal errors
26. **Q4/Q5** — Decompose large UI files + replace silent catch
27. **S6** — Omit ClaimCode from list responses
28. **S7** — SMTP credential encryption or env-var reference
