?
Unknown Status unknown.
Code
// ============================================================================
// Multi-Consumer Async: Progressive Disclosure for Threading
// ============================================================================
// Demonstrates SEMANTIC-level concurrency (events) vs TECHNICAL-level (raw Zig):
//
// Level 1 - Simple: consumeMessages (hides threading)
// Level 2 - Advanced: consumeMessages.async (returns handle)
// Level 3 - Advanced: consumeMessages.await (explicit wait)
// Level 4 - Advanced: consumeMessages.join (wait for multiple)
//
// This test shows 4 consumers processing messages in parallel, all using
// event-level abstractions. Threading is an implementation detail, not part
// of the contract.
//
// Goal: Prove progressive disclosure works for real concurrency
// ============================================================================
const std = @import("std");
// ============================================================================
// MPMC Ring Implementation (vendored from beist-rings)
// ============================================================================
const atomic = std.atomic;
const BUFFER_SIZE = 1024;
fn MpmcRing(comptime T: type, comptime capacity: usize) type {
if (capacity & (capacity - 1) != 0) {
@compileError("Ring capacity must be power of 2");
}
const CacheLine = 64;
const Slot = struct {
seq: atomic.Value(usize),
value: T,
};
return struct {
const Self = @This();
head: atomic.Value(usize) align(CacheLine),
_pad1: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,
tail: atomic.Value(usize) align(CacheLine),
_pad2: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,
slots: [capacity]Slot align(CacheLine),
pub fn init() Self {
var self = Self{
.head = atomic.Value(usize).init(0),
.tail = atomic.Value(usize).init(0),
.slots = undefined,
};
for (&self.slots, 0..) |*slot, i| {
slot.seq = atomic.Value(usize).init(i);
slot.value = undefined;
}
return self;
}
pub fn tryEnqueue(self: *Self, value: T) bool {
var pos = self.head.load(.monotonic);
while (true) {
const slot = &self.slots[pos & (capacity - 1)];
const seq = slot.seq.load(.acquire);
const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos));
if (dif == 0) {
if (self.head.cmpxchgWeak(
pos,
pos + 1,
.monotonic,
.monotonic,
) == null) {
slot.value = value;
slot.seq.store(pos + 1, .release);
return true;
}
pos = self.head.load(.monotonic);
} else if (dif < 0) {
return false;
} else {
pos = self.head.load(.monotonic);
std.Thread.yield() catch {};
}
}
}
pub fn tryDequeue(self: *Self) ?T {
var pos = self.tail.load(.monotonic);
while (true) {
const slot = &self.slots[pos & (capacity - 1)];
const seq = slot.seq.load(.acquire);
const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos + 1));
if (dif == 0) {
if (self.tail.cmpxchgWeak(
pos,
pos + 1,
.monotonic,
.monotonic,
) == null) {
const value = slot.value;
slot.seq.store(pos + capacity, .release);
return value;
}
pos = self.tail.load(.monotonic);
} else if (dif < 0) {
return null;
} else {
pos = self.tail.load(.monotonic);
std.Thread.yield() catch {};
}
}
}
};
}
const Ring = MpmcRing(u64, BUFFER_SIZE);
const TOTAL_MESSAGES: u64 = 1_000_000;
const NUM_CONSUMERS: u64 = 4;
const MESSAGES_PER_CONSUMER: u64 = TOTAL_MESSAGES / NUM_CONSUMERS;
// ============================================================================
// Helper Types for Async Handles
// ============================================================================
const ConsumeResult = struct {
sum: u64,
count: u64,
};
const AsyncHandle = struct {
thread: std.Thread,
result_ptr: *ConsumeResult,
};
// ============================================================================
// Ring Creation
// ============================================================================
~event createRing {}
| created { ring: *Ring }
~proc createRing {
const ring_storage = std.heap.page_allocator.create(Ring) catch unreachable;
ring_storage.* = Ring.init();
return .{ .created = .{ .ring = ring_storage } };
}
// ============================================================================
// Producer: Spawns thread to fill ring with messages
// ============================================================================
~event spawnProducer { ring: *Ring }
| spawned {}
~proc spawnProducer {
const producer = std.Thread.spawn(.{}, struct {
fn run(r: *Ring) void {
var i: u64 = 0;
while (i < TOTAL_MESSAGES) : (i += 1) {
while (!r.tryEnqueue(i)) {
std.Thread.yield() catch {};
}
}
}
}.run, .{ring}) catch unreachable;
producer.detach();
return .{ .spawned = .{} };
}
// ============================================================================
// Level 1: Simple Consumer (Hides Threading)
// ============================================================================
// This is the simple interface - caller doesn't know about threads.
// Use this when you just want the work done and don't care about the details.
~event consumeMessages { ring: *Ring, target: u64 }
| completed { sum: u64, count: u64 }
~proc consumeMessages {
var sum: u64 = 0;
var received: u64 = 0;
while (received < target) {
if (ring.tryDequeue()) |value| {
sum += value;
received += 1;
} else {
std.Thread.yield() catch {};
}
}
return .{ .completed = .{ .sum = sum, .count = received } };
}
// ============================================================================
// Level 2: Async Consumer (Returns Handle)
// ============================================================================
// This is the advanced interface - caller gets a handle to async work.
// Threading happens INSIDE the proc, but the event signature is still clean.
~event consumeMessages.async { ring: *Ring, target: u64 }
| awaitable { handle: AsyncHandle }
~proc consumeMessages.async {
// Allocate result storage (shared between thread and caller)
const result_storage = std.heap.page_allocator.create(ConsumeResult) catch unreachable;
result_storage.* = .{ .sum = 0, .count = 0 };
// Spawn consumer thread with closure capturing ring and target
const thread = std.Thread.spawn(.{}, struct {
fn run(r: *Ring, t: u64, result: *ConsumeResult) void {
var sum: u64 = 0;
var received: u64 = 0;
while (received < t) {
if (r.tryDequeue()) |value| {
sum += value;
received += 1;
} else {
std.Thread.yield() catch {};
}
}
result.sum = sum;
result.count = received;
}
}.run, .{ ring, target, result_storage }) catch unreachable;
return .{ .awaitable = .{ .handle = .{ .thread = thread, .result_ptr = result_storage } } };
}
// ============================================================================
// Level 3: Await Single Handle
// ============================================================================
// Explicitly wait for one async operation to complete.
~event consumeMessages.await { handle: AsyncHandle }
| completed { sum: u64, count: u64 }
~proc consumeMessages.await {
handle.thread.join();
const result = handle.result_ptr.*;
return .{ .completed = .{ .sum = result.sum, .count = result.count } };
}
// ============================================================================
// Level 4: Join Multiple Handles
// ============================================================================
// Wait for multiple async operations to complete.
// This is where the power of progressive disclosure shines!
~event consumeMessages.join { handles: []const AsyncHandle }
| all_completed { results: []const ConsumeResult }
~proc consumeMessages.join {
// Allocate results array
const results = std.heap.page_allocator.alloc(ConsumeResult, handles.len) catch unreachable;
// Join all threads and collect results
for (handles, 0..) |h, i| {
h.thread.join();
results[i] = h.result_ptr.*;
}
return .{ .all_completed = .{ .results = results } };
}
// ============================================================================
// Combine Results from Multiple Consumers
// ============================================================================
~event combineResults { results: []const ConsumeResult }
| combined { total_sum: u64, total_count: u64 }
~proc combineResults {
var total_sum: u64 = 0;
var total_count: u64 = 0;
for (results) |r| {
total_sum += r.sum;
total_count += r.count;
}
return .{ .combined = .{ .total_sum = total_sum, .total_count = total_count } };
}
// ============================================================================
// Validation
// ============================================================================
~event validate { sum: u64, count: u64 }
| valid {}
| invalid {}
~proc validate {
const expected_sum: u64 = TOTAL_MESSAGES * (TOTAL_MESSAGES - 1) / 2;
const expected_count: u64 = TOTAL_MESSAGES;
if (sum == expected_sum and count == expected_count) {
std.debug.print("✓ Multi-Consumer: Validated {} messages across {} consumers (checksum: {})\n", .{ count, NUM_CONSUMERS, sum });
return .{ .valid = .{} };
} else {
std.debug.print("✗ Multi-Consumer: MISMATCH! got sum={}, count={}, expected sum={}, count={}\n", .{ sum, count, expected_sum, expected_count });
return .{ .invalid = .{} };
}
}
// ============================================================================
// Helper: Start First Consumer (Kicks off the chain)
// ============================================================================
~event startConsumers { ring: *Ring }
| started { handle: AsyncHandle }
~proc startConsumers {
// Just delegate to .async
const result_storage = std.heap.page_allocator.create(ConsumeResult) catch unreachable;
result_storage.* = .{ .sum = 0, .count = 0 };
const thread = std.Thread.spawn(.{}, struct {
fn run(r: *Ring, t: u64, result: *ConsumeResult) void {
var sum: u64 = 0;
var received: u64 = 0;
while (received < t) {
if (r.tryDequeue()) |value| {
sum += value;
received += 1;
} else {
std.Thread.yield() catch {};
}
}
result.sum = sum;
result.count = received;
}
}.run, .{ ring, MESSAGES_PER_CONSUMER, result_storage }) catch unreachable;
return .{ .started = .{ .handle = .{ .thread = thread, .result_ptr = result_storage } } };
}
// ============================================================================
// Main Flow: 4 Consumers Using Progressive Disclosure
// ============================================================================
// This demonstrates the beautiful composability of the pattern:
// - Start 4 async consumers (each gets a handle)
// - Join all handles (wait for all to complete)
// - Combine results
// - Validate
//
// All at SEMANTIC level (events), not TECHNICAL level (raw threading)!
~createRing()
| created r |> spawnProducer(ring: r.ring)
| spawned |> startConsumers(ring: r.ring)
| started h1 |> consumeMessages.async(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| awaitable h2 |> consumeMessages.async(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| awaitable h3 |> consumeMessages.async(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| awaitable h4 |> consumeMessages.join(handles: &[_]AsyncHandle{ h1.handle, h2.handle, h3.handle, h4.handle })
| all_completed results |> combineResults(results: results.results)
| combined c |> validate(sum: c.total_sum, count: c.total_count)
| valid |> _
| invalid |> _
Expected Output
✓ Multi-Consumer: Validated 1000000 messages across 4 consumers (checksum: 499999500000)
Test Configuration
MUST_RUN