?
Unknown Status unknown.
Code
// ============================================================================
// Koru Implementation: Producer/Consumer with MPMC Ring
// ============================================================================
// Tests idiomatic Koru concurrent programming:
// - MPMC ring created in Koru events
// - Producer spawned via events
// - Consumer loop expressed as #label/@label jumps
// - All control flow visible as event transitions
//
// Goal: Prove Koru's abstractions are zero-cost (match Zig baseline)
// ============================================================================
const std = @import("std");
// ============================================================================
// MPMC Ring Implementation (vendored from beist-rings)
// ============================================================================
const atomic = std.atomic;
const BUFFER_SIZE = 1024;
fn MpmcRing(comptime T: type, comptime capacity: usize) type {
if (capacity & (capacity - 1) != 0) {
@compileError("Ring capacity must be power of 2");
}
const CacheLine = 64;
const Slot = struct {
seq: atomic.Value(usize),
value: T,
};
return struct {
const Self = @This();
head: atomic.Value(usize) align(CacheLine),
_pad1: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,
tail: atomic.Value(usize) align(CacheLine),
_pad2: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,
slots: [capacity]Slot align(CacheLine),
pub fn init() Self {
var self = Self{
.head = atomic.Value(usize).init(0),
.tail = atomic.Value(usize).init(0),
.slots = undefined,
};
for (&self.slots, 0..) |*slot, i| {
slot.seq = atomic.Value(usize).init(i);
slot.value = undefined;
}
return self;
}
pub fn tryEnqueue(self: *Self, value: T) bool {
var pos = self.head.load(.monotonic);
while (true) {
const slot = &self.slots[pos & (capacity - 1)];
const seq = slot.seq.load(.acquire);
const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos));
if (dif == 0) {
if (self.head.cmpxchgWeak(
pos,
pos + 1,
.monotonic,
.monotonic,
) == null) {
slot.value = value;
slot.seq.store(pos + 1, .release);
return true;
}
pos = self.head.load(.monotonic);
} else if (dif < 0) {
return false;
} else {
pos = self.head.load(.monotonic);
std.Thread.yield() catch {};
}
}
}
pub fn tryDequeue(self: *Self) ?T {
var pos = self.tail.load(.monotonic);
while (true) {
const slot = &self.slots[pos & (capacity - 1)];
const seq = slot.seq.load(.acquire);
const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos + 1));
if (dif == 0) {
if (self.tail.cmpxchgWeak(
pos,
pos + 1,
.monotonic,
.monotonic,
) == null) {
const value = slot.value;
slot.seq.store(pos + capacity, .release);
return value;
}
pos = self.tail.load(.monotonic);
} else if (dif < 0) {
return null;
} else {
pos = self.tail.load(.monotonic);
std.Thread.yield() catch {};
}
}
}
};
}
const Ring = MpmcRing(u64, BUFFER_SIZE);
const MESSAGES: u64 = 10_000_000;
// ============================================================================
// Koru Events - Ring Operations
// ============================================================================
~event create_ring {}
| created { ring: *Ring }
~proc create_ring {
// Allocate ring on heap (needs to outlive this function)
const ring_storage = std.heap.page_allocator.create(Ring) catch unreachable;
ring_storage.* = Ring.init();
return .{ .created = .{ .ring = ring_storage } };
}
~event spawn_producer { ring: *Ring }
| spawned {}
~proc spawn_producer {
// Spawn producer thread
const producer = std.Thread.spawn(.{}, struct {
fn run(r: *Ring) void {
var i: u64 = 0;
while (i < MESSAGES) : (i += 1) {
while (!r.tryEnqueue(i)) {
std.Thread.yield() catch {};
}
}
}
}.run, .{ring}) catch unreachable;
// Detach - we don't need to join
producer.detach();
return .{ .spawned = .{} };
}
~event dequeue { ring: *Ring }
| some { value: u64 }
| none {}
~proc dequeue {
if (ring.tryDequeue()) |value| {
return .{ .some = .{ .value = value } };
} else {
return .{ .none = .{} };
}
}
~event consume_loop { ring: *Ring, sum: u64, received: u64 }
| continue { sum: u64, received: u64 }
| done { sum: u64 }
~proc consume_loop = dequeue(ring: ring)
| some msg |> check_if_done(sum: sum + msg.value, received: received + 1)
| continue s |> continue { sum: s.sum, received: s.received }
| done s |> done { sum: s.sum }
| none |> yield_then_continue(sum: sum, received: received)
| continue s |> continue { sum: s.sum, received: s.received }
~event check_if_done { sum: u64, received: u64 }
| continue { sum: u64, received: u64 }
| done { sum: u64 }
~proc check_if_done {
if (received >= MESSAGES) {
return .{ .done = .{ .sum = sum } };
} else {
return .{ .continue = .{ .sum = sum, .received = received } };
}
}
~event yield_then_continue { sum: u64, received: u64 }
| continue { sum: u64, received: u64 }
~proc yield_then_continue {
std.Thread.yield() catch {};
return .{ .continue = .{ .sum = sum, .received = received } };
}
~event validate { sum: u64 }
| valid {}
| invalid {}
~proc validate {
const expected: u64 = MESSAGES * (MESSAGES - 1) / 2;
if (sum == expected) {
std.debug.print("✓ Koru: Validated {} messages (checksum: {})\n", .{ MESSAGES, sum });
return .{ .valid = .{} };
} else {
std.debug.print("✗ Koru: CHECKSUM MISMATCH! got {}, expected {}\n", .{ sum, expected });
return .{ .invalid = .{} };
}
}
// ============================================================================
// Main Flow - Idiomatic Koru Consumer Loop
// ============================================================================
~create_ring()
| created r |> spawn_producer(r.ring)
| spawned |> #loop consume_loop(r.ring, sum: 0, received: 0)
| continue s |> @loop(r.ring, s.sum, s.received)
| done s |> validate(s.sum)
| valid |> _
| invalid |> _
Imported Files
// ============================================================================
// Koru Taps Implementation: Pure Event-Based Producer/Consumer
// ============================================================================
// Tests the RAW POWER of event taps:
// - Simple counting loop (the "producer")
// - Tap observes each count and accumulates (the "consumer")
// - Tap observes completion to validate
//
// NO RING. NO CHANNEL. NO SHARED MEMORY STRUCTURE.
// Just events and taps proving zero-cost observation.
// ============================================================================
const std = @import("std");
const MESSAGES: u64 = 10_000_000;
// Global accumulator
var sum: u64 = 0;
// ============================================================================
// The counting loop - this is the "producer"
// ============================================================================
~event count { i: u64 }
| next { value: u64 }
| done {}
~proc count {
if (i >= MESSAGES) {
return .{ .done = .{} };
}
return .{ .next = .{ .value = i } };
}
// ============================================================================
// Void events for tap actions
// ============================================================================
~event accumulate { value: u64 }
~proc accumulate {
sum += value;
}
~event validate {}
~proc validate {
const expected: u64 = MESSAGES * (MESSAGES - 1) / 2;
if (sum == expected) {
std.debug.print("✓ Taps: Validated {} messages (checksum: {})\n", .{ MESSAGES, sum });
} else {
std.debug.print("✗ Taps: CHECKSUM MISMATCH! got {}, expected {}\n", .{ sum, expected });
}
}
// Starting event to kick off the loop
~event start {}
| ready {}
~proc start {
return .{ .ready = .{} };
}
// ============================================================================
// EVENT TAPS - The "consumer" pattern via observation
// ============================================================================
// TAP: Observe count, accumulate on "next"
~count -> * | next v |> accumulate(value: v.value)
// TAP: Observe count completion, validate
~count -> * | done |> validate()
// ============================================================================
// Main Flow - Just a simple loop
// ============================================================================
~start()
| ready |> #loop count(i: 0)
| next n |> @loop(i: n.value + 1)
| done |> _
Test Configuration
Post-validation Script:
#!/bin/bash
# Post-validation: Report performance comparison
#
# Comparing Go vs Zig vs Rust vs Koru vs Koru Taps
# Goal: Prove Koru matches Zig/Rust (zero-cost abstraction!)
set -e
if [ ! -f "results.json" ]; then
echo "⚠️ No benchmark results found (results.json missing)"
echo " Running benchmark..."
bash benchmark.sh
fi
if [ ! -f "results.json" ]; then
echo "❌ FAIL: Benchmark did not produce results.json"
exit 1
fi
# Check if jq is installed
if ! command -v jq &> /dev/null; then
echo "⚠️ jq not installed (needed to parse benchmark results)"
echo " Install with: brew install jq (macOS) or apt install jq (Linux)"
echo " Skipping performance validation..."
exit 0
fi
# Parse results
GO_TIME=$(jq -r '.results[0].mean' results.json)
ZIG_TIME=$(jq -r '.results[1].mean' results.json)
BCHAN_TIME=$(jq -r '.results[2].mean' results.json)
RUST_TIME=$(jq -r '.results[3].mean' results.json)
KORU_TIME=$(jq -r '.results[4].mean' results.json)
TAPS_TIME=$(jq -r '.results[5].mean' results.json)
# Calculate ratios
ZIG_VS_GO=$(echo "scale=4; $ZIG_TIME / $GO_TIME" | bc -l)
BCHAN_VS_GO=$(echo "scale=4; $BCHAN_TIME / $GO_TIME" | bc -l)
BCHAN_VS_ZIG=$(echo "scale=4; $BCHAN_TIME / $ZIG_TIME" | bc -l)
RUST_VS_GO=$(echo "scale=4; $RUST_TIME / $GO_TIME" | bc -l)
RUST_VS_ZIG=$(echo "scale=4; $RUST_TIME / $ZIG_TIME" | bc -l)
KORU_VS_ZIG=$(echo "scale=4; $KORU_TIME / $ZIG_TIME" | bc -l)
KORU_VS_RUST=$(echo "scale=4; $KORU_TIME / $RUST_TIME" | bc -l)
KORU_VS_GO=$(echo "scale=4; $KORU_TIME / $GO_TIME" | bc -l)
TAPS_VS_ZIG=$(echo "scale=4; $TAPS_TIME / $ZIG_TIME" | bc -l)
TAPS_VS_KORU=$(echo "scale=4; $TAPS_TIME / $KORU_TIME" | bc -l)
TAPS_VS_GO=$(echo "scale=4; $TAPS_TIME / $GO_TIME" | bc -l)
echo ""
echo "=========================================="
echo " PERFORMANCE COMPARISON"
echo "=========================================="
echo ""
echo "Go (channels): ${GO_TIME}s"
echo "Zig (MPMC ring): ${ZIG_TIME}s"
echo "bchan (MPSC): ${BCHAN_TIME}s"
echo "Rust (crossbeam): ${RUST_TIME}s"
echo "Koru (events): ${KORU_TIME}s"
echo "Koru (taps): ${TAPS_TIME}s"
echo ""
echo "Ratios:"
echo " Zig/Go: ${ZIG_VS_GO}x"
echo " bchan/Go: ${BCHAN_VS_GO}x"
echo " bchan/Zig: ${BCHAN_VS_ZIG}x"
echo " Rust/Go: ${RUST_VS_GO}x"
echo " Rust/Zig: ${RUST_VS_ZIG}x"
echo " Koru/Zig: ${KORU_VS_ZIG}x"
echo " Koru/Rust: ${KORU_VS_RUST}x"
echo " Koru/Go: ${KORU_VS_GO}x"
echo " Taps/Zig: ${TAPS_VS_ZIG}x"
echo " Taps/Koru: ${TAPS_VS_KORU}x"
echo " Taps/Go: ${TAPS_VS_GO}x"
echo ""
# Interpret: Zig vs Go
echo "Zig vs Go:"
if (( $(echo "$ZIG_VS_GO < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $ZIG_VS_GO) * 100" | bc -l)
echo " ✅ Zig is ${IMPROVEMENT}% FASTER than Go"
elif (( $(echo "$ZIG_VS_GO > 1.05" | bc -l) )); then
SLOWDOWN=$(echo "scale=1; ($ZIG_VS_GO - 1) * 100" | bc -l)
echo " ⚠️ Go is ${SLOWDOWN}% faster than Zig"
else
echo " ✅ Roughly equal (within 5%)"
fi
echo ""
# Interpret: Rust vs Go
echo "Rust vs Go:"
if (( $(echo "$RUST_VS_GO < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $RUST_VS_GO) * 100" | bc -l)
echo " ✅ Rust is ${IMPROVEMENT}% FASTER than Go"
elif (( $(echo "$RUST_VS_GO > 1.05" | bc -l) )); then
SLOWDOWN=$(echo "scale=1; ($RUST_VS_GO - 1) * 100" | bc -l)
echo " ⚠️ Go is ${SLOWDOWN}% faster than Rust"
else
echo " ✅ Roughly equal (within 5%)"
fi
echo ""
# Interpret: Rust vs Zig
echo "Rust vs Zig:"
if (( $(echo "$RUST_VS_ZIG < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $RUST_VS_ZIG) * 100" | bc -l)
echo " ✅ Rust is ${IMPROVEMENT}% FASTER than Zig"
elif (( $(echo "$RUST_VS_ZIG > 1.05" | bc -l) )); then
SLOWDOWN=$(echo "scale=1; ($RUST_VS_ZIG - 1) * 100" | bc -l)
echo " ⚠️ Zig is ${SLOWDOWN}% faster than Rust"
else
echo " ✅ Roughly equal (within 5%)"
fi
echo ""
# Interpret: bchan vs Zig (MPSC vs MPMC comparison)
echo "bchan (MPSC) vs Zig (MPMC):"
if (( $(echo "$BCHAN_VS_ZIG < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $BCHAN_VS_ZIG) * 100" | bc -l)
echo " 🚀 bchan is ${IMPROVEMENT}% FASTER than Zig!"
echo " MPSC pattern shows measurable advantage over MPMC"
elif (( $(echo "$BCHAN_VS_ZIG > 1.05" | bc -l) )); then
SLOWDOWN=$(echo "scale=1; ($BCHAN_VS_ZIG - 1) * 100" | bc -l)
echo " ⚠️ Zig MPMC is ${SLOWDOWN}% faster than bchan MPSC"
else
echo " ✅ Roughly equal (within 5%)"
fi
echo ""
# Interpret: Koru vs Zig (CRITICAL!)
echo "Koru vs Zig:"
if (( $(echo "$KORU_VS_ZIG < 1.10" | bc -l) )); then
if (( $(echo "$KORU_VS_ZIG < 1.01" | bc -l) )); then
echo " 🎉 ZERO-COST ABSTRACTION PROVEN!"
echo " Koru matches Zig baseline (<1% overhead)"
else
OVERHEAD=$(echo "scale=1; ($KORU_VS_ZIG - 1) * 100" | bc -l)
echo " ✅ Within threshold (${OVERHEAD}% overhead)"
echo " Koru's abstractions are nearly zero-cost!"
fi
else
OVERHEAD=$(echo "scale=1; ($KORU_VS_ZIG - 1) * 100" | bc -l)
echo " ❌ PERFORMANCE REGRESSION!"
echo " Koru is ${OVERHEAD}% slower than Zig"
echo " This means abstractions have cost - investigate!"
fi
echo ""
# Interpret: Koru vs Go
echo "Koru vs Go:"
if (( $(echo "$KORU_VS_GO < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $KORU_VS_GO) * 100" | bc -l)
echo " 🚀 KORU IS ${IMPROVEMENT}% FASTER THAN GO!"
echo " High-level Koru code beats Go's runtime!"
elif (( $(echo "$KORU_VS_GO > 1.05" | bc -l) )); then
SLOWDOWN=$(echo "scale=1; ($KORU_VS_GO - 1) * 100" | bc -l)
echo " ⚠️ Go is ${SLOWDOWN}% faster than Koru"
else
echo " ✅ Roughly equal (within 5%)"
fi
echo ""
# Interpret: Koru Taps vs Zig (THE BIG TEST!)
echo "Koru Taps vs Zig (PURE EVENTS - NO RING!):"
if (( $(echo "$TAPS_VS_ZIG < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $TAPS_VS_ZIG) * 100" | bc -l)
echo " 🚀 TAPS ARE ${IMPROVEMENT}% FASTER THAN ZIG RING!"
echo " Event taps OUTPERFORM lock-free data structures!"
elif (( $(echo "$TAPS_VS_ZIG < 1.01" | bc -l) )); then
echo " 🎉 TAPS ACHIEVE ZERO-COST!"
echo " Pure event observation matches Zig MPMC ring!"
elif (( $(echo "$TAPS_VS_ZIG < 1.10" | bc -l) )); then
OVERHEAD=$(echo "scale=1; ($TAPS_VS_ZIG - 1) * 100" | bc -l)
echo " ✅ Within threshold (${OVERHEAD}% overhead)"
echo " Taps are competitive with MPMC rings!"
else
OVERHEAD=$(echo "scale=1; ($TAPS_VS_ZIG - 1) * 100" | bc -l)
echo " ⚠️ Taps are ${OVERHEAD}% slower than Zig ring"
echo " (But remember: NO SHARED MEMORY DATA STRUCTURE!)"
fi
echo ""
# Interpret: Taps vs Koru Events
echo "Koru Taps vs Koru Events:"
if (( $(echo "$TAPS_VS_KORU < 0.95" | bc -l) )); then
IMPROVEMENT=$(echo "scale=1; (1 - $TAPS_VS_KORU) * 100" | bc -l)
echo " 🚀 TAPS ARE ${IMPROVEMENT}% FASTER!"
echo " Pure observation beats ring-wrapping events!"
elif (( $(echo "$TAPS_VS_KORU > 1.05" | bc -l) )); then
SLOWDOWN=$(echo "scale=1; ($TAPS_VS_KORU - 1) * 100" | bc -l)
echo " ⚠️ Ring-based events are ${SLOWDOWN}% faster"
else
echo " ✅ Roughly equal (within 5%)"
fi
echo ""
echo "=========================================="
exit 0