○
Planned This feature is planned but not yet implemented.
Feature: Threading library support
Code
// ============================================================================
// Threading Library Demo: Using $std/threading
// ============================================================================
// Demonstrates using the generic threading library to solve the same
// multi-consumer problem as test 2005, but with REUSABLE primitives!
//
// This proves that progressive disclosure is not just a one-off pattern,
// but a GENERAL SOLUTION for expressing concurrency in Koru.
//
// Pattern:
// 1. Define work context (contains input AND output)
// 2. Define work function (takes context, writes result)
// 3. Use worker.spawn.async to spawn workers
// 4. Use worker.spawn.join to wait for all workers
// 5. Extract results from contexts
//
// All threading at SEMANTIC level (events from $std/threading)!
// ============================================================================
const std = @import("std");
~import $std/threading
// ============================================================================
// MPMC Ring Implementation (same as test 2005)
// ============================================================================
const atomic = std.atomic;
const BUFFER_SIZE = 1024;
fn MpmcRing(comptime T: type, comptime capacity: usize) type {
if (capacity & (capacity - 1) != 0) {
@compileError("Ring capacity must be power of 2");
}
const CacheLine = 64;
const Slot = struct {
seq: atomic.Value(usize),
value: T,
};
return struct {
const Self = @This();
head: atomic.Value(usize) align(CacheLine),
_pad1: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,
tail: atomic.Value(usize) align(CacheLine),
_pad2: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,
slots: [capacity]Slot align(CacheLine),
pub fn init() Self {
var self = Self{
.head = atomic.Value(usize).init(0),
.tail = atomic.Value(usize).init(0),
.slots = undefined,
};
for (&self.slots, 0..) |*slot, i| {
slot.seq = atomic.Value(usize).init(i);
slot.value = undefined;
}
return self;
}
pub fn tryEnqueue(self: *Self, value: T) bool {
var pos = self.head.load(.monotonic);
while (true) {
const slot = &self.slots[pos & (capacity - 1)];
const seq = slot.seq.load(.acquire);
const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos));
if (dif == 0) {
if (self.head.cmpxchgWeak(
pos,
pos + 1,
.monotonic,
.monotonic,
) == null) {
slot.value = value;
slot.seq.store(pos + 1, .release);
return true;
}
pos = self.head.load(.monotonic);
} else if (dif < 0) {
return false;
} else {
pos = self.head.load(.monotonic);
std.Thread.yield() catch {};
}
}
}
pub fn tryDequeue(self: *Self) ?T {
var pos = self.tail.load(.monotonic);
while (true) {
const slot = &self.slots[pos & (capacity - 1)];
const seq = slot.seq.load(.acquire);
const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos + 1));
if (dif == 0) {
if (self.tail.cmpxchgWeak(
pos,
pos + 1,
.monotonic,
.monotonic,
) == null) {
const value = slot.value;
slot.seq.store(pos + capacity, .release);
return value;
}
pos = self.tail.load(.monotonic);
} else if (dif < 0) {
return null;
} else {
pos = self.tail.load(.monotonic);
std.Thread.yield() catch {};
}
}
}
};
}
const Ring = MpmcRing(u64, BUFFER_SIZE);
const TOTAL_MESSAGES: u64 = 1_000_000;
const NUM_CONSUMERS: u64 = 4;
const MESSAGES_PER_CONSUMER: u64 = TOTAL_MESSAGES / NUM_CONSUMERS;
// ============================================================================
// Worker Context and Function
// ============================================================================
// This is the key difference from test 2005:
// We define a context that works with the generic threading library!
const ConsumerContext = struct {
// Inputs (immutable, set before spawning)
ring: *Ring,
target: u64,
// Outputs (mutable, written by worker)
sum: u64,
count: u64,
};
// Generic work function that matches WorkFn signature
fn consumerWorker(ctx: *anyopaque) *anyopaque {
const consumer_ctx = @as(*ConsumerContext, @ptrCast(@alignCast(ctx)));
var sum: u64 = 0;
var received: u64 = 0;
while (received < consumer_ctx.target) {
if (consumer_ctx.ring.tryDequeue()) |value| {
sum += value;
received += 1;
} else {
std.Thread.yield() catch {};
}
}
// Write results to context
consumer_ctx.sum = sum;
consumer_ctx.count = received;
return ctx;
}
// ============================================================================
// Ring and Producer Setup (same as test 2005)
// ============================================================================
~event createRing {}
| created { ring: *Ring }
~proc createRing {
const ring_storage = std.heap.page_allocator.create(Ring) catch unreachable;
ring_storage.* = Ring.init();
return .{ .created = .{ .ring = ring_storage } };
}
~event spawnProducer { ring: *Ring }
| spawned {}
~proc spawnProducer {
const producer = std.Thread.spawn(.{}, struct {
fn run(r: *Ring) void {
var i: u64 = 0;
while (i < TOTAL_MESSAGES) : (i += 1) {
while (!r.tryEnqueue(i)) {
std.Thread.yield() catch {};
}
}
}
}.run, .{ring}) catch unreachable;
producer.detach();
return .{ .spawned = .{} };
}
// ============================================================================
// Consumer Spawning Using Threading Library
// ============================================================================
~event spawnConsumer { ring: *Ring, target: u64 }
| spawned { context: *ConsumerContext }
~proc spawnConsumer {
// Allocate context on heap (must outlive this function)
const ctx = std.heap.page_allocator.create(ConsumerContext) catch unreachable;
ctx.* = .{
.ring = ring,
.target = target,
.sum = 0,
.count = 0,
};
return .{ .spawned = .{ .context = ctx } };
}
// ============================================================================
// Result Processing
// ============================================================================
~event combineResults { contexts: []*ConsumerContext }
| combined { total_sum: u64, total_count: u64 }
~proc combineResults {
var total_sum: u64 = 0;
var total_count: u64 = 0;
for (contexts) |ctx| {
total_sum += ctx.sum;
total_count += ctx.count;
}
return .{ .combined = .{ .total_sum = total_sum, .total_count = total_count } };
}
~event validate { sum: u64, count: u64 }
| valid {}
| invalid {}
~proc validate {
const expected_sum: u64 = TOTAL_MESSAGES * (TOTAL_MESSAGES - 1) / 2;
const expected_count: u64 = TOTAL_MESSAGES;
if (sum == expected_sum and count == expected_count) {
std.debug.print("✓ Threading Library: Validated {} messages across {} consumers (checksum: {})\n", .{ count, NUM_CONSUMERS, sum });
return .{ .valid = .{} };
} else {
std.debug.print("✗ Threading Library: MISMATCH! got sum={}, count={}, expected sum={}, count={}\n", .{ sum, count, expected_sum, expected_count });
return .{ .invalid = .{} };
}
}
// ============================================================================
// Helper Events: Wrap context casting in events
// ============================================================================
~event castToOpaque { context: *ConsumerContext }
| casted { ptr: *anyopaque }
~proc castToOpaque {
return .{ .casted = .{ .ptr = @as(*anyopaque, @ptrCast(context)) } };
}
~event extractContexts { results: []*anyopaque }
| extracted { contexts: []*ConsumerContext }
~proc extractContexts {
const contexts = std.heap.page_allocator.alloc(*ConsumerContext, results.len) catch unreachable;
for (results, 0..) |result, i| {
contexts[i] = @as(*ConsumerContext, @ptrCast(@alignCast(result)));
}
return .{ .extracted = .{ .contexts = contexts } };
}
// ============================================================================
// Main Flow: Using $std/threading!
// ============================================================================
// This is the beautiful part - we use the LIBRARY, not inline implementations!
//
// Pattern:
// 1. Create 4 consumer contexts
// 2. Use threading:worker.spawn.async to spawn each with consumerWorker function
// 3. Use threading:worker.spawn.join to wait for all 4
// 4. Extract contexts and combine results
//
// All threading primitives come from the LIBRARY! 🎉
// Helper to collect handles into array
~event collectHandles { h1: threading:WorkerHandle, h2: threading:WorkerHandle, h3: threading:WorkerHandle, h4: threading:WorkerHandle }
| collected { handles: []const threading:WorkerHandle }
~proc collectHandles {
const handles = std.heap.page_allocator.alloc(threading:WorkerHandle, 4) catch unreachable;
handles[0] = h1;
handles[1] = h2;
handles[2] = h3;
handles[3] = h4;
return .{ .collected = .{ .handles = handles } };
}
~createRing()
| created r |> spawnProducer(ring: r.ring)
| spawned |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| spawned ctx1 |> castToOpaque(context: ctx1.context)
| casted c1 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c1.ptr)
| awaitable h1 |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| spawned ctx2 |> castToOpaque(context: ctx2.context)
| casted c2 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c2.ptr)
| awaitable h2 |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| spawned ctx3 |> castToOpaque(context: ctx3.context)
| casted c3 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c3.ptr)
| awaitable h3 |> spawnConsumer(ring: r.ring, target: MESSAGES_PER_CONSUMER)
| spawned ctx4 |> castToOpaque(context: ctx4.context)
| casted c4 |> threading:worker.spawn.async(work_fn: consumerWorker, context: c4.ptr)
| awaitable h4 |> collectHandles(h1: h1.handle, h2: h2.handle, h3: h3.handle, h4: h4.handle)
| collected handles |> threading:worker.spawn.join(handles: handles.handles)
| all_completed results |> extractContexts(results: results.results)
| extracted e |> combineResults(contexts: e.contexts)
| combined c |> validate(sum: c.total_sum, count: c.total_count)
| valid |> _
| invalid |> _
Expected Output
✓ Threading Library: Validated 1000000 messages across 4 consumers (checksum: 499999500000)
Test Configuration
MUST_RUN