commit e9943f6c98aa1f61f8bbe318a1fd30b2f0284b7b
parent aeeabc0b34c21bb435a4243f17e37c577411a1ce
Author: Martin Ashby <martin@ashbysoft.com>
Date: Tue, 30 Jan 2024 22:48:12 +0000
Use threads to make it faster
Diffstat:
3 files changed, 128 insertions(+), 75 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -1,2 +1,3 @@
zig-out
zig-cache/
+perf.data
diff --git a/src/main.zig b/src/main.zig
@@ -26,23 +26,122 @@ pub fn main() !void {
std.log.info("finished at {} s", .{t.read() / std.time.ns_per_s});
}
-fn run(a: std.mem.Allocator, infile: []const u8, t: *std.time.Timer) ![]const u8 {
-
- std.log.info("mmap done, iterating!", .{});
- var res = std.StringArrayHashMap(Accumulator).init(a);
+fn run(a: std.mem.Allocator, infile: []const u8, _: *std.time.Timer) ![]const u8 {
+ const threadcount = try std.Thread.getCpuCount();
+ var ress = try a.alloc(std.StringArrayHashMap(Accumulator), threadcount);
+ defer a.free(ress);
+ var threads = try a.alloc(std.Thread, threadcount);
+ defer a.free(threads);
+ var threadnames = try a.alloc([]const u8, threadcount);
+ defer a.free(threadnames);
+
+ var start: usize = 0;
+ for (0..threadcount) |i| {
+ ress[i] = std.StringArrayHashMap(Accumulator).init(a);
+ var end = ((infile.len * (i+1)) / threadcount);
+ while (infile.len > end and infile[end] != '\n') end += 1;
+ const infile_part = infile[start..end];
+ const threadname = try std.fmt.allocPrint(a, "threads {}", .{i});
+ threadnames[i] = threadname;
+ threads[i] = try std.Thread.spawn(.{}, run_part, .{&ress[i], infile_part, threadname});
+ start = end;
+ }
defer {
- var it = res.iterator();
- while (it.next()) |e| {
- a.free(e.key_ptr.*);
+ for (0..threadcount) |i| {
+ defer free_keys_and_deinit(&ress[i]);
+ a.free(threadnames[i]);
}
- res.deinit();
}
+ for (0..threadcount) |i| {
+ threads[i].join();
+ }
+ // Now merge the results
+ var res = std.StringArrayHashMap(Accumulator).init(a);
+ defer res.deinit(); // Doesn't own it's own keys
+ for (0..threadcount) |i| {
+ try merge_in(&res, &ress[i]);
+ }
+
+ // Sort and print
+ const Srt = struct {
+ keys: [][]const u8,
+ pub fn lessThan(self: @This(), a_index: usize, b_index: usize) bool {
+ // character value order!
+ return std.mem.order(u8, self.keys[a_index], self.keys[b_index]).compare(.lt);
+ }
+ };
+ res.sort(Srt{.keys = res.keys()});
+
+ var rr = std.ArrayList(u8).init(a);
+ defer rr.deinit();
+ var ww = rr.writer();
+ try ww.writeAll("{");
+ var it = res.iterator();
+ while (it.next()) |nxt| {
+ const k = nxt.key_ptr.*;
+ try ww.writeAll(k);
+ try ww.writeAll("=");
+ const v = nxt.value_ptr.*;
+ const mm = @as(i32, v.min) - 999;
+ try std.fmt.format(ww, "{}.{}", .{@divTrunc(mm ,10),@abs(@rem(mm, 10))});
+ try ww.writeAll("/");
+ const mx = @as(i32, v.max) - 999;
+ try std.fmt.format(ww, "{}.{}", .{@divTrunc(mx,10), @abs(@rem(mx, 10))});
+ try ww.writeAll("/");
+ const s_1 = v.sum / v.count; // mean
+ const s_2 = @as(i64, @intCast(s_1)) - 999; // shift
+ // std.log.warn("s_2 {}", .{s_2});
+ try std.fmt.format(ww, "{}.{}", .{@divTrunc(s_2 ,10), @abs(@rem(s_2, 10))}); // scale
+ //try std.fmt.format(ww, " ct {} sum {}", .{v.count, v.sum});
+ try ww.writeAll(", ");
+ }
+ try ww.writeAll("}");
+ return try rr.toOwnedSlice();
+}
+
+fn merge_in(res_f: *std.StringArrayHashMap(Accumulator), res_a: *std.StringArrayHashMap(Accumulator)) !void {
+ var it = res_a.iterator();
+ while (it.next()) |e| {
+ const r = e.value_ptr.*;
+ const gpr = try res_f.getOrPut(e.key_ptr.*);
+ if (gpr.found_existing) {
+ const rr = gpr.value_ptr.*;
+ gpr.value_ptr.* = Accumulator{
+ .min = @min(rr.min, r.min),
+ .max = @max(rr.max, r.max),
+ .sum = rr.sum + r.sum,
+ .count = rr.count + r.count,
+ };
+ } else {
+ gpr.value_ptr.* = r;
+ }
+ }
+}
+
+const Accumulator = struct {
+ min: u16,
+ max: u16,
+ sum: u64,
+ count: u64,
+};
+
+fn free_keys_and_deinit(hm: *std.StringArrayHashMap(Accumulator)) void {
+ for (hm.keys()) |*k| {
+ hm.allocator.free(k.*);
+ }
+ hm.deinit();
+}
+
+fn run_part(res: *std.StringArrayHashMap(Accumulator), infile: []const u8, name: []const u8) !void {
+ var t = try std.time.Timer.start(); // I know it's supported on my platform
var lines = std.mem.tokenizeScalar(u8, infile, '\n');
var ct: usize = 0;
while (lines.next()) |line| {
ct += 1;
- if (ct % 100000 == 0) {
- std.log.info("processed {} lines at {} seconds", .{ct, t.read() / std.time.ns_per_s});
+ if (ct % 1000000 == 0) {
+ const sec = t.read() / std.time.ns_per_s;
+ const rows_sec = ct / sec;
+ std.log.info("thread {s} processed {} lines at {} seconds, rate {} rows / sec", .{name, ct, sec, rows_sec});
}
var spl = std.mem.splitScalar(u8, line, ';');
const key = spl.first();
@@ -78,7 +177,7 @@ fn run(a: std.mem.Allocator, infile: []const u8, t: *std.time.Timer) ![]const u8
.count = e.count + 1,
};
} else {
- const kd = try a.dupe(u8, key);
+ const kd = try res.allocator.dupe(u8, key);
try res.put(kd,.{
.min = val,
.max = val,
@@ -86,56 +185,9 @@ fn run(a: std.mem.Allocator, infile: []const u8, t: *std.time.Timer) ![]const u8
.count = 1,
});
}
- }
-
- // Go theough the keys sorted
- // OK so i think I should use integers rather than actual floating point values.
- // -999 -> 999 maps to positive only 0 -> 1998
- // so I guess go with u16?
-
- const Srt = struct {
- keys: [][]const u8,
- pub fn lessThan(self: @This(), a_index: usize, b_index: usize) bool {
- // character value order!
- return std.mem.order(u8, self.keys[a_index], self.keys[b_index]).compare(.lt);
- }
- };
- res.sort(Srt{.keys = res.keys()});
-
- var rr = std.ArrayList(u8).init(a);
- defer rr.deinit();
- var ww = rr.writer();
- try ww.writeAll("{");
- var it = res.iterator();
- while (it.next()) |nxt| {
- const k = nxt.key_ptr.*;
- try ww.writeAll(k);
- try ww.writeAll("=");
- const v = nxt.value_ptr.*;
- const mm = @as(i32, v.min) - 999;
- try std.fmt.format(ww, "{}.{}", .{@divTrunc(mm ,10),@abs(@rem(mm, 10))});
- try ww.writeAll("/");
- const mx = @as(i32, v.max) - 999;
- try std.fmt.format(ww, "{}.{}", .{@divTrunc(mx,10), @abs(@rem(mx, 10))});
- try ww.writeAll("/");
- const s_1 = v.sum / v.count; // mean
- const s_2 = @as(i64, @intCast(s_1)) - 999; // shift
- // std.log.warn("s_2 {}", .{s_2});
- try std.fmt.format(ww, "{}.{}", .{@divTrunc(s_2 ,10), @abs(@rem(s_2, 10))}); // scale
- //try std.fmt.format(ww, " ct {} sum {}", .{v.count, v.sum});
- try ww.writeAll(", ");
- }
- try ww.writeAll("}");
- return try rr.toOwnedSlice();
+ }
}
-const Accumulator = struct {
- min: u16,
- max: u16,
- sum: u64,
- count: u64,
-};
-
// Result must be closed with std.os.munmap
fn open_mmap(dir: std.fs.Dir, file_path: []const u8) ![]align(std.mem.page_size) u8 {
var f = try dir.openFile(file_path, .{ .mode = .read_only });
@@ -146,22 +198,7 @@ fn open_mmap(dir: std.fs.Dir, file_path: []const u8) ![]align(std.mem.page_size)
test {
- const test_input =
- \\Hamburg;12.0
- \\Bulawayo;8.9
- \\Palembang;38.8
- \\St. John's;15.2
- \\Cracow;12.6
- \\Bridgetown;26.9
- \\Roseau;34.4
- \\Conakry;31.2
- \\Istanbul;23.0
- \\Istanbul;-3.0
- \\Istanbul;-9.0
- \\Istanbul;-10.0
- \\Istanbul;-15.0
- \\Istanbul;6.2
- ;
+ const test_input = @embedFile("measurement_test.txt");
const test_output =
\\{Bridgetown=26.9/26.9/26.9, Bulawayo=8.9/8.9/8.9, Conakry=31.2/31.2/31.2, Cracow=12.6/12.6/12.6, Hamburg=12.0/12.0/12.0, Istanbul=-15.0/23.0/-1.3, Palembang=38.8/38.8/38.8, Roseau=34.4/34.4/34.4, St. John's=15.2/15.2/15.2, }
;
diff --git a/src/measurement_test.txt b/src/measurement_test.txt
@@ -0,0 +1,14 @@
+Istanbul;23.0
+Istanbul;-3.0
+Istanbul;-9.0
+Hamburg;12.0
+Bulawayo;8.9
+Palembang;38.8
+St. John's;15.2
+Cracow;12.6
+Bridgetown;26.9
+Roseau;34.4
+Conakry;31.2
+Istanbul;-10.0
+Istanbul;-15.0
+Istanbul;6.2
+\ No newline at end of file