006 rings vs channels

? Unknown Status unknown.

Code

// ============================================================================
// Koru Implementation: Producer/Consumer with MPMC Ring
// ============================================================================
// Tests idiomatic Koru concurrent programming:
// - MPMC ring created in Koru events
// - Producer spawned via events
// - Consumer loop expressed as #label/@label jumps
// - All control flow visible as event transitions
//
// Goal: Prove Koru's abstractions are zero-cost (match Zig baseline)
// ============================================================================

const std = @import("std");

// ============================================================================
// MPMC Ring Implementation (vendored from beist-rings)
// ============================================================================

const atomic = std.atomic;
const BUFFER_SIZE = 1024;

fn MpmcRing(comptime T: type, comptime capacity: usize) type {
    if (capacity & (capacity - 1) != 0) {
        @compileError("Ring capacity must be power of 2");
    }

    const CacheLine = 64;

    const Slot = struct {
        seq: atomic.Value(usize),
        value: T,
    };

    return struct {
        const Self = @This();

        head: atomic.Value(usize) align(CacheLine),
        _pad1: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,

        tail: atomic.Value(usize) align(CacheLine),
        _pad2: [CacheLine - @sizeOf(atomic.Value(usize))]u8 = undefined,

        slots: [capacity]Slot align(CacheLine),

        pub fn init() Self {
            var self = Self{
                .head = atomic.Value(usize).init(0),
                .tail = atomic.Value(usize).init(0),
                .slots = undefined,
            };

            for (&self.slots, 0..) |*slot, i| {
                slot.seq = atomic.Value(usize).init(i);
                slot.value = undefined;
            }

            return self;
        }

        pub fn tryEnqueue(self: *Self, value: T) bool {
            var pos = self.head.load(.monotonic);

            while (true) {
                const slot = &self.slots[pos & (capacity - 1)];
                const seq = slot.seq.load(.acquire);
                const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos));

                if (dif == 0) {
                    if (self.head.cmpxchgWeak(
                        pos,
                        pos + 1,
                        .monotonic,
                        .monotonic,
                    ) == null) {
                        slot.value = value;
                        slot.seq.store(pos + 1, .release);
                        return true;
                    }
                    pos = self.head.load(.monotonic);
                } else if (dif < 0) {
                    return false;
                } else {
                    pos = self.head.load(.monotonic);
                    std.Thread.yield() catch {};
                }
            }
        }

        pub fn tryDequeue(self: *Self) ?T {
            var pos = self.tail.load(.monotonic);

            while (true) {
                const slot = &self.slots[pos & (capacity - 1)];
                const seq = slot.seq.load(.acquire);
                const dif = @as(isize, @intCast(seq)) -% @as(isize, @intCast(pos + 1));

                if (dif == 0) {
                    if (self.tail.cmpxchgWeak(
                        pos,
                        pos + 1,
                        .monotonic,
                        .monotonic,
                    ) == null) {
                        const value = slot.value;
                        slot.seq.store(pos + capacity, .release);
                        return value;
                    }
                    pos = self.tail.load(.monotonic);
                } else if (dif < 0) {
                    return null;
                } else {
                    pos = self.tail.load(.monotonic);
                    std.Thread.yield() catch {};
                }
            }
        }
    };
}

const Ring = MpmcRing(u64, BUFFER_SIZE);
const MESSAGES: u64 = 10_000_000;

// ============================================================================
// Koru Events - Ring Operations
// ============================================================================

~event create_ring {}
| created { ring: *Ring }

~proc create_ring {
    // Allocate ring on heap (needs to outlive this function)
    const ring_storage = std.heap.page_allocator.create(Ring) catch unreachable;
    ring_storage.* = Ring.init();
    return .{ .created = .{ .ring = ring_storage } };
}

~event spawn_producer { ring: *Ring }
| spawned {}

~proc spawn_producer {
    // Spawn producer thread
    const producer = std.Thread.spawn(.{}, struct {
        fn run(r: *Ring) void {
            var i: u64 = 0;
            while (i < MESSAGES) : (i += 1) {
                while (!r.tryEnqueue(i)) {
                    std.Thread.yield() catch {};
                }
            }
        }
    }.run, .{ring}) catch unreachable;

    // Detach - we don't need to join
    producer.detach();

    return .{ .spawned = .{} };
}

~event dequeue { ring: *Ring }
| some { value: u64 }
| none {}

~proc dequeue {
    if (ring.tryDequeue()) |value| {
        return .{ .some = .{ .value = value } };
    } else {
        return .{ .none = .{} };
    }
}

~event consume_loop { ring: *Ring, sum: u64, received: u64 }
| continue { sum: u64, received: u64 }
| done { sum: u64 }

~proc consume_loop = dequeue(ring: ring)
| some msg |> check_if_done(sum: sum + msg.value, received: received + 1)
    | continue s |> continue { sum: s.sum, received: s.received }
    | done s |> done { sum: s.sum }
| none |> yield_then_continue(sum: sum, received: received)
    | continue s |> continue { sum: s.sum, received: s.received }

~event check_if_done { sum: u64, received: u64 }
| continue { sum: u64, received: u64 }
| done { sum: u64 }

~proc check_if_done {
    if (received >= MESSAGES) {
        return .{ .done = .{ .sum = sum } };
    } else {
        return .{ .continue = .{ .sum = sum, .received = received } };
    }
}

~event yield_then_continue { sum: u64, received: u64 }
| continue { sum: u64, received: u64 }

~proc yield_then_continue {
    std.Thread.yield() catch {};
    return .{ .continue = .{ .sum = sum, .received = received } };
}

~event validate { sum: u64 }
| valid {}
| invalid {}

~proc validate {
    const expected: u64 = MESSAGES * (MESSAGES - 1) / 2;
    if (sum == expected) {
        std.debug.print("✓ Koru: Validated {} messages (checksum: {})\n", .{ MESSAGES, sum });
        return .{ .valid = .{} };
    } else {
        std.debug.print("✗ Koru: CHECKSUM MISMATCH! got {}, expected {}\n", .{ sum, expected });
        return .{ .invalid = .{} };
    }
}

// ============================================================================
// Main Flow - Idiomatic Koru Consumer Loop
// ============================================================================

~create_ring()
| created r |> spawn_producer(r.ring)
    | spawned |> #loop consume_loop(r.ring, sum: 0, received: 0)
        | continue s |> @loop(r.ring, s.sum, s.received)
        | done s |> validate(s.sum)
            | valid |> _
            | invalid |> _
input.kz

Imported Files

// ============================================================================
// Koru Taps Implementation: Pure Event-Based Producer/Consumer
// ============================================================================
// Tests the RAW POWER of event taps:
// - Simple counting loop (the "producer")
// - Tap observes each count and accumulates (the "consumer")
// - Tap observes completion to validate
//
// NO RING. NO CHANNEL. NO SHARED MEMORY STRUCTURE.
// Just events and taps proving zero-cost observation.
// ============================================================================

const std = @import("std");

const MESSAGES: u64 = 10_000_000;

// Global accumulator
var sum: u64 = 0;

// ============================================================================
// The counting loop - this is the "producer"
// ============================================================================

~event count { i: u64 }
| next { value: u64 }
| done {}

~proc count {
    if (i >= MESSAGES) {
        return .{ .done = .{} };
    }
    return .{ .next = .{ .value = i } };
}

// ============================================================================
// Void events for tap actions
// ============================================================================

~event accumulate { value: u64 }

~proc accumulate {
    sum += value;
}

~event validate {}

~proc validate {
    const expected: u64 = MESSAGES * (MESSAGES - 1) / 2;
    if (sum == expected) {
        std.debug.print("✓ Taps: Validated {} messages (checksum: {})\n", .{ MESSAGES, sum });
    } else {
        std.debug.print("✗ Taps: CHECKSUM MISMATCH! got {}, expected {}\n", .{ sum, expected });
    }
}

// Starting event to kick off the loop
~event start {}
| ready {}

~proc start {
    return .{ .ready = .{} };
}

// ============================================================================
// EVENT TAPS - The "consumer" pattern via observation
// ============================================================================

// TAP: Observe count, accumulate on "next"
~count -> * | next v |> accumulate(value: v.value)

// TAP: Observe count completion, validate
~count -> * | done |> validate()

// ============================================================================
// Main Flow - Just a simple loop
// ============================================================================

~start()
| ready |> #loop count(i: 0)
    | next n |> @loop(i: n.value + 1)
    | done |> _
input_taps.kz

Test Configuration

Post-validation Script:

#!/bin/bash
# Post-validation: Report performance comparison
#
# Comparing Go vs Zig vs Rust vs Koru vs Koru Taps
# Goal: Prove Koru matches Zig/Rust (zero-cost abstraction!)

set -e

if [ ! -f "results.json" ]; then
    echo "⚠️  No benchmark results found (results.json missing)"
    echo "   Running benchmark..."
    bash benchmark.sh
fi

if [ ! -f "results.json" ]; then
    echo "❌ FAIL: Benchmark did not produce results.json"
    exit 1
fi

# Check if jq is installed
if ! command -v jq &> /dev/null; then
    echo "⚠️  jq not installed (needed to parse benchmark results)"
    echo "   Install with: brew install jq (macOS) or apt install jq (Linux)"
    echo "   Skipping performance validation..."
    exit 0
fi

# Parse results
GO_TIME=$(jq -r '.results[0].mean' results.json)
ZIG_TIME=$(jq -r '.results[1].mean' results.json)
BCHAN_TIME=$(jq -r '.results[2].mean' results.json)
RUST_TIME=$(jq -r '.results[3].mean' results.json)
KORU_TIME=$(jq -r '.results[4].mean' results.json)
TAPS_TIME=$(jq -r '.results[5].mean' results.json)

# Calculate ratios
ZIG_VS_GO=$(echo "scale=4; $ZIG_TIME / $GO_TIME" | bc -l)
BCHAN_VS_GO=$(echo "scale=4; $BCHAN_TIME / $GO_TIME" | bc -l)
BCHAN_VS_ZIG=$(echo "scale=4; $BCHAN_TIME / $ZIG_TIME" | bc -l)
RUST_VS_GO=$(echo "scale=4; $RUST_TIME / $GO_TIME" | bc -l)
RUST_VS_ZIG=$(echo "scale=4; $RUST_TIME / $ZIG_TIME" | bc -l)
KORU_VS_ZIG=$(echo "scale=4; $KORU_TIME / $ZIG_TIME" | bc -l)
KORU_VS_RUST=$(echo "scale=4; $KORU_TIME / $RUST_TIME" | bc -l)
KORU_VS_GO=$(echo "scale=4; $KORU_TIME / $GO_TIME" | bc -l)
TAPS_VS_ZIG=$(echo "scale=4; $TAPS_TIME / $ZIG_TIME" | bc -l)
TAPS_VS_KORU=$(echo "scale=4; $TAPS_TIME / $KORU_TIME" | bc -l)
TAPS_VS_GO=$(echo "scale=4; $TAPS_TIME / $GO_TIME" | bc -l)

echo ""
echo "=========================================="
echo "  PERFORMANCE COMPARISON"
echo "=========================================="
echo ""
echo "Go (channels):        ${GO_TIME}s"
echo "Zig (MPMC ring):      ${ZIG_TIME}s"
echo "bchan (MPSC):         ${BCHAN_TIME}s"
echo "Rust (crossbeam):     ${RUST_TIME}s"
echo "Koru (events):        ${KORU_TIME}s"
echo "Koru (taps):          ${TAPS_TIME}s"
echo ""
echo "Ratios:"
echo "  Zig/Go:       ${ZIG_VS_GO}x"
echo "  bchan/Go:     ${BCHAN_VS_GO}x"
echo "  bchan/Zig:    ${BCHAN_VS_ZIG}x"
echo "  Rust/Go:      ${RUST_VS_GO}x"
echo "  Rust/Zig:     ${RUST_VS_ZIG}x"
echo "  Koru/Zig:     ${KORU_VS_ZIG}x"
echo "  Koru/Rust:    ${KORU_VS_RUST}x"
echo "  Koru/Go:      ${KORU_VS_GO}x"
echo "  Taps/Zig:     ${TAPS_VS_ZIG}x"
echo "  Taps/Koru:    ${TAPS_VS_KORU}x"
echo "  Taps/Go:      ${TAPS_VS_GO}x"
echo ""

# Interpret: Zig vs Go
echo "Zig vs Go:"
if (( $(echo "$ZIG_VS_GO < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $ZIG_VS_GO) * 100" | bc -l)
    echo "  ✅ Zig is ${IMPROVEMENT}% FASTER than Go"
elif (( $(echo "$ZIG_VS_GO > 1.05" | bc -l) )); then
    SLOWDOWN=$(echo "scale=1; ($ZIG_VS_GO - 1) * 100" | bc -l)
    echo "  ⚠️  Go is ${SLOWDOWN}% faster than Zig"
else
    echo "  ✅ Roughly equal (within 5%)"
fi

echo ""

# Interpret: Rust vs Go
echo "Rust vs Go:"
if (( $(echo "$RUST_VS_GO < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $RUST_VS_GO) * 100" | bc -l)
    echo "  ✅ Rust is ${IMPROVEMENT}% FASTER than Go"
elif (( $(echo "$RUST_VS_GO > 1.05" | bc -l) )); then
    SLOWDOWN=$(echo "scale=1; ($RUST_VS_GO - 1) * 100" | bc -l)
    echo "  ⚠️  Go is ${SLOWDOWN}% faster than Rust"
else
    echo "  ✅ Roughly equal (within 5%)"
fi

echo ""

# Interpret: Rust vs Zig
echo "Rust vs Zig:"
if (( $(echo "$RUST_VS_ZIG < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $RUST_VS_ZIG) * 100" | bc -l)
    echo "  ✅ Rust is ${IMPROVEMENT}% FASTER than Zig"
elif (( $(echo "$RUST_VS_ZIG > 1.05" | bc -l) )); then
    SLOWDOWN=$(echo "scale=1; ($RUST_VS_ZIG - 1) * 100" | bc -l)
    echo "  ⚠️  Zig is ${SLOWDOWN}% faster than Rust"
else
    echo "  ✅ Roughly equal (within 5%)"
fi

echo ""

# Interpret: bchan vs Zig (MPSC vs MPMC comparison)
echo "bchan (MPSC) vs Zig (MPMC):"
if (( $(echo "$BCHAN_VS_ZIG < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $BCHAN_VS_ZIG) * 100" | bc -l)
    echo "  🚀 bchan is ${IMPROVEMENT}% FASTER than Zig!"
    echo "  MPSC pattern shows measurable advantage over MPMC"
elif (( $(echo "$BCHAN_VS_ZIG > 1.05" | bc -l) )); then
    SLOWDOWN=$(echo "scale=1; ($BCHAN_VS_ZIG - 1) * 100" | bc -l)
    echo "  ⚠️  Zig MPMC is ${SLOWDOWN}% faster than bchan MPSC"
else
    echo "  ✅ Roughly equal (within 5%)"
fi

echo ""

# Interpret: Koru vs Zig (CRITICAL!)
echo "Koru vs Zig:"
if (( $(echo "$KORU_VS_ZIG < 1.10" | bc -l) )); then
    if (( $(echo "$KORU_VS_ZIG < 1.01" | bc -l) )); then
        echo "  🎉 ZERO-COST ABSTRACTION PROVEN!"
        echo "  Koru matches Zig baseline (<1% overhead)"
    else
        OVERHEAD=$(echo "scale=1; ($KORU_VS_ZIG - 1) * 100" | bc -l)
        echo "  ✅ Within threshold (${OVERHEAD}% overhead)"
        echo "  Koru's abstractions are nearly zero-cost!"
    fi
else
    OVERHEAD=$(echo "scale=1; ($KORU_VS_ZIG - 1) * 100" | bc -l)
    echo "  ❌ PERFORMANCE REGRESSION!"
    echo "  Koru is ${OVERHEAD}% slower than Zig"
    echo "  This means abstractions have cost - investigate!"
fi

echo ""

# Interpret: Koru vs Go
echo "Koru vs Go:"
if (( $(echo "$KORU_VS_GO < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $KORU_VS_GO) * 100" | bc -l)
    echo "  🚀 KORU IS ${IMPROVEMENT}% FASTER THAN GO!"
    echo "  High-level Koru code beats Go's runtime!"
elif (( $(echo "$KORU_VS_GO > 1.05" | bc -l) )); then
    SLOWDOWN=$(echo "scale=1; ($KORU_VS_GO - 1) * 100" | bc -l)
    echo "  ⚠️  Go is ${SLOWDOWN}% faster than Koru"
else
    echo "  ✅ Roughly equal (within 5%)"
fi

echo ""

# Interpret: Koru Taps vs Zig (THE BIG TEST!)
echo "Koru Taps vs Zig (PURE EVENTS - NO RING!):"
if (( $(echo "$TAPS_VS_ZIG < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $TAPS_VS_ZIG) * 100" | bc -l)
    echo "  🚀 TAPS ARE ${IMPROVEMENT}% FASTER THAN ZIG RING!"
    echo "  Event taps OUTPERFORM lock-free data structures!"
elif (( $(echo "$TAPS_VS_ZIG < 1.01" | bc -l) )); then
    echo "  🎉 TAPS ACHIEVE ZERO-COST!"
    echo "  Pure event observation matches Zig MPMC ring!"
elif (( $(echo "$TAPS_VS_ZIG < 1.10" | bc -l) )); then
    OVERHEAD=$(echo "scale=1; ($TAPS_VS_ZIG - 1) * 100" | bc -l)
    echo "  ✅ Within threshold (${OVERHEAD}% overhead)"
    echo "  Taps are competitive with MPMC rings!"
else
    OVERHEAD=$(echo "scale=1; ($TAPS_VS_ZIG - 1) * 100" | bc -l)
    echo "  ⚠️  Taps are ${OVERHEAD}% slower than Zig ring"
    echo "  (But remember: NO SHARED MEMORY DATA STRUCTURE!)"
fi

echo ""

# Interpret: Taps vs Koru Events
echo "Koru Taps vs Koru Events:"
if (( $(echo "$TAPS_VS_KORU < 0.95" | bc -l) )); then
    IMPROVEMENT=$(echo "scale=1; (1 - $TAPS_VS_KORU) * 100" | bc -l)
    echo "  🚀 TAPS ARE ${IMPROVEMENT}% FASTER!"
    echo "  Pure observation beats ring-wrapping events!"
elif (( $(echo "$TAPS_VS_KORU > 1.05" | bc -l) )); then
    SLOWDOWN=$(echo "scale=1; ($TAPS_VS_KORU - 1) * 100" | bc -l)
    echo "  ⚠️  Ring-based events are ${SLOWDOWN}% faster"
else
    echo "  ✅ Roughly equal (within 5%)"
fi

echo ""

echo "=========================================="

exit 0