All files / emulator / emulator / ws-emulator.ts

30.00% Branches 3/10
79.83% Lines 95/119
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
 
 
 
 
 
x2
x8
x6
 
x6
x4
x4
 
x2
x4
x109
 
 
x479
x479
 
x479
x479
x479
x495
x495
x1980
 
 
 
x495
 
 
 
x479
x4
x4
 
x2
x10
x10
x17
x17
x10
x10
 
 
 
 
 
 
 
x2
x2
x2
x18
 
x18
x18
 
x2
x4
x4
x2
 
x2
x2
x4
x4
x4
x4
 
x4
x2
 
x2
x4
x4
 
x2
x4
x4
 
 
x4
x6
x6
x6
x6
x4
x4
 
x2
x78
x26
x26
x26
 
x2
x10
x10
x10
x10
x10
x18
x18
x18
x54
x18
 
x10
x10
x10
 
 
 
 
 
 
 
 
x2
 
x2
 
x2
x4
x4
 
x4
 
x6
x6
 
 
 
x4
x4
x4
 
x2
x4
x4
x4
x4
x4















I










I



I
























I

























I















































I

I












// Copyright (c) 2025 Anton A Nesterov <an+vski@vski.sh>, VSKI License
//

import { EmulatorWorkflowService } from "./service.ts";

export class EmulatorWebSocketServer {
  private subscriptions = new Map<string, Set<EmulatorWebSocket>>();
  private pollingInterval: any = null;

  constructor(private service: EmulatorWorkflowService) {
    this.startPolling();
  }

  private startPolling() {
    this.pollingInterval = setInterval(async () => {
      for (const [scopedQueueName, clients] of this.subscriptions.entries()) {
        if (clients.size === 0) continue;

        const [db, ...rest] = scopedQueueName.split(":");
        const queueName = rest.join(":");

        try {
          const job = await this.service.poll(db, queueName);
          if (job) {
            const worker = this.selectWorker(clients);
            if (worker) {
              worker.receive(JSON.stringify({ event: "JOB", data: job }));
            } else {
              await this.service.nack(db, job.id);
            }
          }
        } catch (e) {
          console.error(`Error polling queue ${scopedQueueName}:`, e);
        }
      }
    }, 200);
  }

  handleSubscribe(client: EmulatorWebSocket, db: string, queueName: string) {
    const scopedQueueName = `${db}:${queueName}`;
    if (!this.subscriptions.has(scopedQueueName)) {
      this.subscriptions.set(scopedQueueName, new Set());
    }
    this.subscriptions.get(scopedQueueName)?.add(client);
  }

  handleDisconnect(client: EmulatorWebSocket) {
    for (const subs of this.subscriptions.values()) {
      subs.delete(client);
    }
  }

  private selectWorker(
    clients: Set<EmulatorWebSocket>,
  ): EmulatorWebSocket | null {
    const arr = Array.from(clients).filter((c) => c.readyState === 1); // OPEN
    if (arr.length === 0) return null;
    return arr[Math.floor(Math.random() * arr.length)];
  }

  stop() {
    clearInterval(this.pollingInterval);
  }
}

export class EmulatorWebSocket extends EventTarget {
  readyState = 0; // CONNECTING
  onopen: any = null;
  onmessage: any = null;
  onerror: any = null;
  onclose: any = null;

  private static server: EmulatorWebSocketServer;
  private db = "postgres";

  static setServer(server: EmulatorWebSocketServer) {
    this.server = server;
  }

  constructor(url: string) {
    super();
    const u = new URL(url);
    this.db = u.searchParams.get("db") || "postgres";

    setTimeout(() => {
      this.readyState = 1; // OPEN
      const ev = new Event("open");
      this.dispatchEvent(ev);
      if (this.onopen) this.onopen(ev);
    }, 10);
  }

  receive(data: string) {
    const ev = new MessageEvent("message", { data });
    this.dispatchEvent(ev);
    if (this.onmessage) this.onmessage(ev);
  }

  send(data: string) {
    const msg = JSON.parse(data);
    if (msg.event === "SUBSCRIBE") {
      EmulatorWebSocket.server.handleSubscribe(this, this.db, msg.data.queue);
      const workflowName = msg.data.queue.replace("__wkf_workflow_", "");
      setTimeout(() => {
        this.receive(
          JSON.stringify({
            event: "SUBSCRIBED",
            data: { workflowName },
          }),
        );
      }, 10);
    }
  }

  close() {
    this.readyState = 3; // CLOSED
    EmulatorWebSocket.server.handleDisconnect(this);
    const ev = new CloseEvent("close");
    this.dispatchEvent(ev);
    if (this.onclose) this.onclose(ev);
  }
}

let originalWS: any = null;

export function installWebSocketEmulator(server: EmulatorWebSocketServer) {
  EmulatorWebSocket.setServer(server);
  originalWS = globalThis.WebSocket;

  const patchedWS = function (this: any, url: string) {
    if (url.includes("/api/workflow/ws") || url.startsWith("ws://emulator")) {
      return new EmulatorWebSocket(url);
    }
    return new (originalWS as any)(url);
  };

  patchedWS.prototype = originalWS.prototype;
  (globalThis as any).WebSocket = patchedWS as any;
}

export function uninstallWebSocketEmulator() {
  if (originalWS) {
    globalThis.WebSocket = originalWS;
    originalWS = null;
  }
}