009 multi consumer async

? Unknown Status unknown.

Code

// ============================================================================
// Multi-Consumer Async: Progressive Disclosure for Threading
// ============================================================================
// Demonstrates SEMANTIC-level concurrency (events) vs TECHNICAL-level (raw Zig):
//
// Level 1 - Simple:     consumeMessages (hides threading)
// Level 2 - Advanced:   consumeMessages.async (returns handle)
// Level 3 - Advanced:   consumeMessages.await (explicit wait)
// Level 4 - Advanced:   consumeMessages.join (wait for multiple)
//
// This test shows 4 consumers processing messages in parallel, all using
// event-level abstractions. Threading is an implementation detail, not part
// of the contract.
//
// Goal: Prove progressive disclosure works for real concurrency
// ============================================================================

const std = @import("std");

// ============================================================================
// MPMC Ring Implementation (vendored from beist-rings)
// ============================================================================

const atomic = std.atomic;
const BUFFER_SIZE = 1024;

fn MpmcRing(comptime T: type, comptime capacity: usize) type {
    if (capacity & (capacity - 1) != 0) {
        @compileError("Ring capacity must be power of 2");
    }

    const CacheLine = 64;

    const Slot = struct {
        seq: atomic.Value(usize),
        value: T,
    };

    return struct {
        const Self = @This();

        head: atomic.Value(usize) align(CacheLine),
        _pad1: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,

        tail: atomic.Value(usize) align(CacheLine),
        _pad2: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,

        slots: [capacity]Slot align(CacheLine),

        pub fn init() Self {
            var self = Self{
                .head = atomic.Value(usize).init(0),
                .tail = atomic.Value(usize).init(0),
                .slots = undefined,
            };

            for (&self.slots, 0..) |*slot, i| {
                slot.seq = atomic.Value(usize).init(i);
                slot.value = undefined;
            }

            return self;
        }

        pub fn tryEnqueue(self: *Self, value: T) bool {
            var pos = self.head.load(.monotonic);

            while (true) {
                const slot = &self.slots[pos & (capacity - 1)];
                const seq = slot.seq.load(.acquire);
                const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos));

                if (dif == 0) {
                    if (self.head.cmpxchgWeak(
                        pos,
                        pos + 1,
                        .monotonic,
                        .monotonic,
                    ) == null) {
                        slot.value = value;
                        slot.seq.store(pos + 1, .release);
                        return true;
                    }
                    pos = self.head.load(.monotonic);
                } else if (dif < 0) {
                    return false;
                } else {
                    pos = self.head.load(.monotonic);
                    std.Thread.yield() catch {};
                }
            }
        }

        pub fn tryDequeue(self: *Self) ?T {
            var pos = self.tail.load(.monotonic);

            while (true) {
                const slot = &self.slots[pos & (capacity - 1)];
                const seq = slot.seq.load(.acquire);
                const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos + 1));

                if (dif == 0) {
                    if (self.tail.cmpxchgWeak(
                        pos,
                        pos + 1,
                        .monotonic,
                        .monotonic,
                    ) == null) {
                        const value = slot.value;
                        slot.seq.store(pos + capacity, .release);
                        return value;
                    }
                    pos = self.tail.load(.monotonic);
                } else if (dif < 0) {
                    return null;
                } else {
                    pos = self.tail.load(.monotonic);
                    std.Thread.yield() catch {};
                }
            }
        }
    };
}

const Ring = MpmcRing(u64, BUFFER_SIZE);
const TOTAL_MESSAGES: u64 = 1_000_000;
const NUM_CONSUMERS: u64 = 4;
const MESSAGES_PER_CONSUMER: u64 = TOTAL_MESSAGES / NUM_CONSUMERS;

// ============================================================================
// Helper Types for Async Handles
// ============================================================================

const ConsumeResult = struct {
    sum: u64,
    count: u64,
};

const AsyncHandle = struct {
    thread: std.Thread,
    result_ptr: *ConsumeResult,
};

// ============================================================================
// Ring Creation
// ============================================================================

~event createRing {}
| created { ring: *Ring }

~proc createRing {
    const ring_storage = std.heap.page_allocator.create(Ring) catch unreachable;
    ring_storage.* = Ring.init();
    return .{ .created = .{ .ring = ring_storage } };
}

// ============================================================================
// Producer: Spawns thread to fill ring with messages
// ============================================================================

~event spawnProducer { ring: *Ring }
| spawned {}

~proc spawnProducer {
    const producer = std.Thread.spawn(.{}, struct {
        fn run(r: *Ring) void {
            var i: u64 = 0;
            while (i < TOTAL_MESSAGES) : (i += 1) {
                while (!r.tryEnqueue(i)) {
                    std.Thread.yield() catch {};
                }
            }
        }
    }.run, .{ring}) catch unreachable;

    producer.detach();
    return .{ .spawned = .{} };
}

// ============================================================================
// Level 1: Simple Consumer (Hides Threading)
// ============================================================================
// This is the simple interface - caller doesn't know about threads.
// Use this when you just want the work done and don't care about the details.

~event consumeMessages { ring: *Ring, target: u64 }
| completed { sum: u64, count: u64 }

~proc consumeMessages {
    var sum: u64 = 0;
    var received: u64 = 0;

    while (received < target) {
        if (ring.tryDequeue()) |value| {
            sum += value;
            received += 1;
        } else {
            std.Thread.yield() catch {};
        }
    }

    return .{ .completed = .{ .sum = sum, .count = received } };
}

// ============================================================================
// Level 2: Async Consumer (Returns Handle)
// ============================================================================
// This is the advanced interface - caller gets a handle to async work.
// Threading happens INSIDE the proc, but the event signature is still clean.

~event consumeMessages.async { ring: *Ring, target: u64 }
| awaitable { handle: AsyncHandle }

~proc consumeMessages.async {
    // Allocate result storage (shared between thread and caller)
    const result_storage = std.heap.page_allocator.create(ConsumeResult) catch unreachable;
    result_storage.* = .{ .sum = 0, .count = 0 };

    // Spawn consumer thread with closure capturing ring and target
    const thread = std.Thread.spawn(.{}, struct {
        fn run(r: *Ring, t: u64, result: *ConsumeResult) void {
            var sum: u64 = 0;
            var received: u64 = 0;

            while (received < t) {
                if (r.tryDequeue()) |value| {
                    sum += value;
                    received += 1;
                } else {
                    std.Thread.yield() catch {};
                }
            }

            result.sum = sum;
            result.count = received;
        }
    }.run, .{ ring, target, result_storage }) catch unreachable;

    return .{ .awaitable = .{ .handle = .{ .thread = thread, .result_ptr = result_storage } } };
}

// ============================================================================
// Level 3: Await Single Handle
// ============================================================================
// Explicitly wait for one async operation to complete.

~event consumeMessages.await { handle: AsyncHandle }
| completed { sum: u64, count: u64 }

~proc consumeMessages.await {
    handle.thread.join();
    const result = handle.result_ptr.*;
    return .{ .completed = .{ .sum = result.sum, .count = result.count } };
}

// ============================================================================
// Level 4: Join Multiple Handles
// ============================================================================
// Wait for multiple async operations to complete.
// This is where the power of progressive disclosure shines!

~event consumeMessages.join { handles: []const AsyncHandle }
| all_completed { results: []const ConsumeResult }

~proc consumeMessages.join {
    // Allocate results array
    const results = std.heap.page_allocator.alloc(ConsumeResult, handles.len) catch unreachable;

    // Join all threads and collect results
    for (handles, 0..) |h, i| {
        h.thread.join();
        results[i] = h.result_ptr.*;
    }

    return .{ .all_completed = .{ .results = results } };
}

// ============================================================================
// Combine Results from Multiple Consumers
// ============================================================================

~event combineResults { results: []const ConsumeResult }
| combined { total_sum: u64, total_count: u64 }

~proc combineResults {
    var total_sum: u64 = 0;
    var total_count: u64 = 0;

    for (results) |r| {
        total_sum += r.sum;
        total_count += r.count;
    }

    return .{ .combined = .{ .total_sum = total_sum, .total_count = total_count } };
}

// ============================================================================
// Validation
// ============================================================================

~event validate { sum: u64, count: u64 }
| valid {}
| invalid {}

~proc validate {
    const expected_sum: u64 = TOTAL_MESSAGES * (TOTAL_MESSAGES - 1) / 2;
    const expected_count: u64 = TOTAL_MESSAGES;

    if (sum == expected_sum and count == expected_count) {
        std.debug.print("✓ Multi-Consumer: Validated {} messages across {} consumers (checksum: {})\n", .{ count, NUM_CONSUMERS, sum });
        return .{ .valid = .{} };
    } else {
        std.debug.print("✗ Multi-Consumer: MISMATCH! got sum={}, count={}, expected sum={}, count={}\n", .{ sum, count, expected_sum, expected_count });
        return .{ .invalid = .{} };
    }
}

// ============================================================================
// Helper: Start First Consumer (Kicks off the chain)
// ============================================================================

~event startConsumers { ring: *Ring }
| started { handle: AsyncHandle }

~proc startConsumers {
    // Just delegate to .async
    const result_storage = std.heap.page_allocator.create(ConsumeResult) catch unreachable;
    result_storage.* = .{ .sum = 0, .count = 0 };

    const thread = std.Thread.spawn(.{}, struct {
        fn run(r: *Ring, t: u64, result: *ConsumeResult) void {
            var sum: u64 = 0;
            var received: u64 = 0;

            while (received < t) {
                if (r.tryDequeue()) |value| {
                    sum += value;
                    received += 1;
                } else {
                    std.Thread.yield() catch {};
                }
            }

            result.sum = sum;
            result.count = received;
        }
    }.run, .{ ring, MESSAGES_PER_CONSUMER, result_storage }) catch unreachable;

    return .{ .started = .{ .handle = .{ .thread = thread, .result_ptr = result_storage } } };
}

// ============================================================================
// Main Flow: 4 Consumers Using Progressive Disclosure
// ============================================================================
// This demonstrates the beautiful composability of the pattern:
// - Start 4 async consumers (each gets a handle)
// - Join all handles (wait for all to complete)
// - Combine results
// - Validate
//
// All at SEMANTIC level (events), not TECHNICAL level (raw threading)!

~createRing()
| created r |> spawnProducer(ring: r.ring)
    | spawned |> startConsumers(ring: r.ring)
        | started h1 |> consumeMessages.async(ring: r.ring, target: MESSAGES_PER_CONSUMER)
            | awaitable h2 |> consumeMessages.async(ring: r.ring, target: MESSAGES_PER_CONSUMER)
                | awaitable h3 |> consumeMessages.async(ring: r.ring, target: MESSAGES_PER_CONSUMER)
                    | awaitable h4 |> consumeMessages.join(handles: &[_]AsyncHandle{ h1.handle, h2.handle, h3.handle, h4.handle })
                        | all_completed results |> combineResults(results: results.results)
                            | combined c |> validate(sum: c.total_sum, count: c.total_count)
                                | valid |> _
                                | invalid |> _
input.kz

Expected Output

✓ Multi-Consumer: Validated 1000000 messages across 4 consumers (checksum: 499999500000)

Test Configuration

MUST_RUN