010 threading library

○ Planned This feature is planned but not yet implemented.

Feature: Threading library support

Code

// ============================================================================
// Threading Library Demo: Using $std/threading
// ============================================================================
// Demonstrates using the generic threading library to solve the same
// multi-consumer problem as test 2005, but with REUSABLE primitives!
//
// This proves that progressive disclosure is not just a one-off pattern,
// but a GENERAL SOLUTION for expressing concurrency in Koru.
//
// Pattern:
// 1. Define work context (contains input AND output)
// 2. Define work function (takes context, writes result)
// 3. Use worker.spawn.async to spawn workers
// 4. Use worker.spawn.join to wait for all workers
// 5. Extract results from contexts
//
// All threading at SEMANTIC level (events from $std/threading)!
// ============================================================================

const std = @import("std");
~import $std/threading

// ============================================================================
// MPMC Ring Implementation (same as test 2005)
// ============================================================================

const atomic = std.atomic;
const BUFFER_SIZE = 1024;

fn MpmcRing(comptime T: type, comptime capacity: usize) type {
    if (capacity & (capacity - 1) != 0) {
        @compileError("Ring capacity must be power of 2");
    }

    const CacheLine = 64;

    const Slot = struct {
        seq: atomic.Value(usize),
        value: T,
    };

    return struct {
        const Self = @This();

        head: atomic.Value(usize) align(CacheLine),
        _pad1: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,

        tail: atomic.Value(usize) align(CacheLine),
        _pad2: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,

        slots: [capacity]Slot align(CacheLine),

        pub fn init() Self {
            var self = Self{
                .head = atomic.Value(usize).init(0),
                .tail = atomic.Value(usize).init(0),
                .slots = undefined,
            };

            for (&self.slots, 0..) |*slot, i| {
                slot.seq = atomic.Value(usize).init(i);
                slot.value = undefined;
            }

            return self;
        }

        pub fn tryEnqueue(self: *Self, value: T) bool {
            var pos = self.head.load(.monotonic);

            while (true) {
                const slot = &self.slots[pos & (capacity - 1)];
                const seq = slot.seq.load(.acquire);
                const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos));

                if (dif == 0) {
                    if (self.head.cmpxchgWeak(
                        pos,
                        pos + 1,
                        .monotonic,
                        .monotonic,
                    ) == null) {
                        slot.value = value;
                        slot.seq.store(pos + 1, .release);
                        return true;
                    }
                    pos = self.head.load(.monotonic);
                } else if (dif < 0) {
                    return false;
                } else {
                    pos = self.head.load(.monotonic);
                    std.Thread.yield() catch {};
                }
            }
        }

        pub fn tryDequeue(self: *Self) ?T {
            var pos = self.tail.load(.monotonic);

            while (true) {
                const slot = &self.slots[pos & (capacity - 1)];
                const seq = slot.seq.load(.acquire);
                const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos + 1));

                if (dif == 0) {
                    if (self.tail.cmpxchgWeak(
                        pos,
                        pos + 1,
                        .monotonic,
                        .monotonic,
                    ) == null) {
                        const value = slot.value;
                        slot.seq.store(pos + capacity, .release);
                        return value;
                    }
                    pos = self.tail.load(.monotonic);
                } else if (dif < 0) {
                    return null;
                } else {
                    pos = self.tail.load(.monotonic);
                    std.Thread.yield() catch {};
                }
            }
        }
    };
}

const Ring = MpmcRing(u64, BUFFER_SIZE);
const TOTAL_MESSAGES: u64 = 1_000_000;
const NUM_CONSUMERS: u64 = 4;
const MESSAGES_PER_CONSUMER: u64 = TOTAL_MESSAGES / NUM_CONSUMERS;

// ============================================================================
// Worker Context and Function
// ============================================================================
// This is the key difference from test 2005:
// We define a context that works with the generic threading library!

const ConsumerContext = struct {
    // Inputs (immutable, set before spawning)
    ring: *Ring,
    target: u64,

    // Outputs (mutable, written by worker)
    sum: u64,
    count: u64,
};

// Generic work function that matches WorkFn signature
fn consumerWorker(ctx: *anyopaque) *anyopaque {
    const consumer_ctx = @as(*ConsumerContext, @ptrCast(@alignCast(ctx)));

    var sum: u64 = 0;
    var received: u64 = 0;

    while (received < consumer_ctx.target) {
        if (consumer_ctx.ring.tryDequeue()) |value| {
            sum += value;
            received += 1;
        } else {
            std.Thread.yield() catch {};
        }
    }

    // Write results to context
    consumer_ctx.sum = sum;
    consumer_ctx.count = received;

    return ctx;
}

// ============================================================================
// Ring and Producer Setup (same as test 2005)
// ============================================================================

~event createRing {}
| created { ring: *Ring }

~proc createRing {
    const ring_storage = std.heap.page_allocator.create(Ring) catch unreachable;
    ring_storage.* = Ring.init();
    return .{ .created = .{ .ring = ring_storage } };
}

~event spawnProducer { ring: *Ring }
| spawned {}

~proc spawnProducer {
    const producer = std.Thread.spawn(.{}, struct {
        fn run(r: *Ring) void {
            var i: u64 = 0;
            while (i < TOTAL_MESSAGES) : (i += 1) {
                while (!r.tryEnqueue(i)) {
                    std.Thread.yield() catch {};
                }
            }
        }
    }.run, .{ring}) catch unreachable;

    producer.detach();
    return .{ .spawned = .{} };
}

// ============================================================================
// Consumer Spawning Using Threading Library
// ============================================================================

~event spawnConsumer { ring: *Ring, target: u64 }
| spawned { context: *ConsumerContext }

~proc spawnConsumer {
    // Allocate context on heap (must outlive this function)
    const ctx = std.heap.page_allocator.create(ConsumerContext) catch unreachable;
    ctx.* = .{
        .ring = ring,
        .target = target,
        .sum = 0,
        .count = 0,
    };

    return .{ .spawned = .{ .context = ctx } };
}

// ============================================================================
// Result Processing
// ============================================================================

~event combineResults { contexts: []*ConsumerContext }
| combined { total_sum: u64, total_count: u64 }

~proc combineResults {
    var total_sum: u64 = 0;
    var total_count: u64 = 0;

    for (contexts) |ctx| {
        total_sum += ctx.sum;
        total_count += ctx.count;
    }

    return .{ .combined = .{ .total_sum = total_sum, .total_count = total_count } };
}

~event validate { sum: u64, count: u64 }
| valid {}
| invalid {}

~proc validate {
    const expected_sum: u64 = TOTAL_MESSAGES * (TOTAL_MESSAGES - 1) / 2;
    const expected_count: u64 = TOTAL_MESSAGES;

    if (sum == expected_sum and count == expected_count) {
        std.debug.print("✓ Threading Library: Validated {} messages across {} consumers (checksum: {})\n", .{ count, NUM_CONSUMERS, sum });
        return .{ .valid = .{} };
    } else {
        std.debug.print("✗ Threading Library: MISMATCH! got sum={}, count={}, expected sum={}, count={}\n", .{ sum, count, expected_sum, expected_count });
        return .{ .invalid = .{} };
    }
}

// ============================================================================
// Helper Events: Wrap context casting in events
// ============================================================================

~event castToOpaque { context: *ConsumerContext }
| casted { ptr: *anyopaque }

~proc castToOpaque {
    return .{ .casted = .{ .ptr = @as(*anyopaque, @ptrCast(context)) } };
}

~event extractContexts { results: []*anyopaque }
| extracted { contexts: []*ConsumerContext }

~proc extractContexts {
    const contexts = std.heap.page_allocator.alloc(*ConsumerContext, results.len) catch unreachable;

    for (results, 0..) |result, i| {
        contexts[i] = @as(*ConsumerContext, @ptrCast(@alignCast(result)));
    }

    return .{ .extracted = .{ .contexts = contexts } };
}

// ============================================================================
// Main Flow: Using $std/threading!
// ============================================================================
// This is the beautiful part - we use the LIBRARY, not inline implementations!
//
// Pattern:
// 1. Create 4 consumer contexts
// 2. Use threading:worker.spawn.async to spawn each with consumerWorker function
// 3. Use threading:worker.spawn.join to wait for all 4
// 4. Extract contexts and combine results
//
// All threading primitives come from the LIBRARY! 🎉

// Helper to collect handles into array
~event collectHandles { h1: threading:WorkerHandle, h2: threading:WorkerHandle, h3: threading:WorkerHandle, h4: threading:WorkerHandle }
| collected { handles: []const threading:WorkerHandle }

~proc collectHandles {
    const handles = std.heap.page_allocator.alloc(threading:WorkerHandle, 4) catch unreachable;
    handles[0] = h1;
    handles[1] = h2;
    handles[2] = h3;
    handles[3] = h4;
    return .{ .collected = .{ .handles = handles } };
}

~createRing()
| created r |> spawnProducer(ring: r.ring)
    | spawned |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
        | spawned ctx1 |> castToOpaque(context: ctx1.context)
            | casted c1 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c1.ptr)
                | awaitable h1 |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
                    | spawned ctx2 |> castToOpaque(context: ctx2.context)
                        | casted c2 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c2.ptr)
                            | awaitable h2 |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
                                | spawned ctx3 |> castToOpaque(context: ctx3.context)
                                    | casted c3 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c3.ptr)
                                        | awaitable h3 |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
                                            | spawned ctx4 |> castToOpaque(context: ctx4.context)
                                                | casted c4 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c4.ptr)
                                                    | awaitable h4 |> collectHandles(h1: h1.handle, h2: h2.handle, h3: h3.handle, h4: h4.handle)
                                                        | collected handles |> threading:worker.spawn.join(handles: handles.handles)
                                                            | all_completed results |> extractContexts(results: results.results)
                                                                | extracted e |> combineResults(contexts: e.contexts)
                                                                    | combined c |> validate(sum: c.total_sum, count: c.total_count)
                                                                        | valid |> _
                                                                        | invalid |> _
input.kz

Expected Output

✓ Threading Library: Validated 1000000 messages across 4 consumers (checksum: 499999500000)

Test Configuration

MUST_RUN