Skip to content

Commit 32dc46a

Browse files
committed
std.Io: add Group.concurrent
A function that participates in a group but guarantees allocation of one unit of concurrency, or returns an error.
1 parent 476d7d9 commit 32dc46a

File tree

3 files changed

+123
-3
lines changed

3 files changed

+123
-3
lines changed

lib/std/Io.zig

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -626,8 +626,9 @@ pub const VTable = struct {
626626
/// Thread-safe.
627627
cancelRequested: *const fn (?*anyopaque) bool,
628628

629-
/// Executes `start` asynchronously in a manner such that it cleans itself
630-
/// up. This mode does not support results, await, or cancel.
629+
/// When this function returns, implementation guarantees that `start` has
630+
/// either already been called, or a unit of concurrency has been assigned
631+
/// to the task of calling the function.
631632
///
632633
/// Thread-safe.
633634
groupAsync: *const fn (
@@ -640,6 +641,17 @@ pub const VTable = struct {
640641
context_alignment: std.mem.Alignment,
641642
start: *const fn (*Group, context: *const anyopaque) void,
642643
) void,
644+
/// Thread-safe.
645+
groupConcurrent: *const fn (
646+
/// Corresponds to `Io.userdata`.
647+
userdata: ?*anyopaque,
648+
/// Owner of the spawned async task.
649+
group: *Group,
650+
/// Copied and then passed to `start`.
651+
context: []const u8,
652+
context_alignment: std.mem.Alignment,
653+
start: *const fn (*Group, context: *const anyopaque) void,
654+
) ConcurrentError!void,
643655
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
644656
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
645657

@@ -1021,8 +1033,8 @@ pub const Group = struct {
10211033
/// Threadsafe.
10221034
///
10231035
/// See also:
1024-
/// * `Io.async`
10251036
/// * `concurrent`
1037+
/// * `Io.async`
10261038
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
10271039
const Args = @TypeOf(args);
10281040
const TypeErased = struct {
@@ -1035,6 +1047,34 @@ pub const Group = struct {
10351047
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
10361048
}
10371049

1050+
/// Calls `function` with `args`, such that the function is not guaranteed
1051+
/// to have returned until `wait` is called, allowing the caller to
1052+
/// progress while waiting for any `Io` operations.
1053+
///
1054+
/// The resource spawned is owned by the group; after this is called,
1055+
/// `wait` or `cancel` must be called before the group is deinitialized.
1056+
///
1057+
/// This has stronger guarantee than `async`, placing restrictions on what kind
1058+
/// of `Io` implementations are supported. By calling `async` instead, one
1059+
/// allows, for example, stackful single-threaded blocking I/O.
1060+
///
1061+
/// Threadsafe.
1062+
///
1063+
/// See also:
1064+
/// * `async`
1065+
/// * `Io.concurrent`
1066+
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
1067+
const Args = @TypeOf(args);
1068+
const TypeErased = struct {
1069+
fn start(group: *Group, context: *const anyopaque) void {
1070+
_ = group;
1071+
const args_casted: *const Args = @ptrCast(@alignCast(context));
1072+
@call(.auto, function, args_casted.*);
1073+
}
1074+
};
1075+
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
1076+
}
1077+
10381078
/// Blocks until all tasks of the group finish. During this time,
10391079
/// cancellation requests propagate to all members of the group.
10401080
///

lib/std/Io/Threaded.zig

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ pub fn init(
116116
/// * `Io.VTable.async`
117117
/// * `Io.VTable.concurrent`
118118
/// * `Io.VTable.groupAsync`
119+
/// * `Io.VTable.groupConcurrent`
119120
/// If these functions are avoided, then `Allocator.failing` may be passed
120121
/// here.
121122
gpa: Allocator,
@@ -221,6 +222,7 @@ pub fn io(t: *Threaded) Io {
221222
.select = select,
222223

223224
.groupAsync = groupAsync,
225+
.groupConcurrent = groupConcurrent,
224226
.groupWait = groupWait,
225227
.groupCancel = groupCancel,
226228

@@ -317,6 +319,7 @@ pub fn ioBasic(t: *Threaded) Io {
317319
.select = select,
318320

319321
.groupAsync = groupAsync,
322+
.groupConcurrent = groupConcurrent,
320323
.groupWait = groupWait,
321324
.groupCancel = groupCancel,
322325

@@ -729,6 +732,57 @@ fn groupAsync(
729732
t.cond.signal();
730733
}
731734

735+
fn groupConcurrent(
736+
userdata: ?*anyopaque,
737+
group: *Io.Group,
738+
context: []const u8,
739+
context_alignment: Alignment,
740+
start: *const fn (*Io.Group, context: *const anyopaque) void,
741+
) Io.ConcurrentError!void {
742+
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
743+
744+
const t: *Threaded = @ptrCast(@alignCast(userdata));
745+
746+
const gpa = t.allocator;
747+
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
748+
return error.ConcurrencyUnavailable;
749+
750+
t.mutex.lock();
751+
defer t.mutex.unlock();
752+
753+
const busy_count = t.busy_count;
754+
755+
if (busy_count >= @intFromEnum(t.concurrent_limit))
756+
return error.ConcurrencyUnavailable;
757+
758+
t.busy_count = busy_count + 1;
759+
errdefer t.busy_count = busy_count;
760+
761+
const pool_size = t.wait_group.value();
762+
if (pool_size - busy_count == 0) {
763+
t.wait_group.start();
764+
errdefer t.wait_group.finish();
765+
766+
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
767+
return error.ConcurrencyUnavailable;
768+
thread.detach();
769+
}
770+
771+
// Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe.
772+
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
773+
group.token = &gc.node;
774+
775+
t.run_queue.prepend(&gc.closure.node);
776+
777+
// This needs to be done before unlocking the mutex to avoid a race with
778+
// the associated task finishing.
779+
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
780+
const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic);
781+
assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending));
782+
783+
t.cond.signal();
784+
}
785+
732786
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
733787
const t: *Threaded = @ptrCast(@alignCast(userdata));
734788
const gpa = t.allocator;

lib/std/Io/test.zig

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,32 @@ fn sleep(io: Io, result: *usize) void {
172172
result.* = 1;
173173
}
174174

175+
test "Group concurrent" {
176+
const io = testing.io;
177+
178+
var group: Io.Group = .init;
179+
defer group.cancel(io);
180+
var results: [2]usize = undefined;
181+
182+
group.concurrent(io, count, .{ 1, 10, &results[0] }) catch |err| switch (err) {
183+
error.ConcurrencyUnavailable => {
184+
try testing.expect(builtin.single_threaded);
185+
return;
186+
},
187+
};
188+
189+
group.concurrent(io, count, .{ 20, 30, &results[1] }) catch |err| switch (err) {
190+
error.ConcurrencyUnavailable => {
191+
try testing.expect(builtin.single_threaded);
192+
return;
193+
},
194+
};
195+
196+
group.wait(io);
197+
198+
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
199+
}
200+
175201
test "select" {
176202
const io = testing.io;
177203

0 commit comments

Comments
 (0)