diff options
-rw-r--r-- | src/data_row.zig | 88 | ||||
-rw-r--r-- | src/main.zig | 2 |
2 files changed, 90 insertions, 0 deletions
diff --git a/src/data_row.zig b/src/data_row.zig new file mode 100644 index 0000000..558ebca --- /dev/null +++ b/src/data_row.zig @@ -0,0 +1,88 @@ +const std = @import("std"); +const log = std.log.scoped(.pgz); +const ByteArrayList = std.ArrayList(u8); +const ProtocolError = @import("main.zig").ProtocolError; +const ClientError = @import("main.zig").ClientError; +const enum_from_int = @import("main.zig").enum_from_int; + +pub const Tag: u8 = 'D'; + +const DataRow = @This(); + +buf: ?[]const u8 = null, // owned +columns: [][]const u8, // also owned + +pub fn read(a: std.mem.Allocator, b: []const u8) !DataRow { + if (b.len < 2) return ProtocolError.InvalidMessageLength; + var buf = try a.dupe(u8, b); + var res: DataRow = undefined; + res.buf = buf; + errdefer res.deinit(a); + + const n_columns = std.mem.readIntBig(u16, buf[0..2]); + const columns = try a.alloc([]const u8, n_columns); + errdefer a.free(columns); + var pos: usize = 2; + for (0..n_columns) |col| { + const len = std.mem.readIntBig(u32, buf[pos..(pos+4)][0..4]); // second slice forces the slice size to be known at comptime and satisfy the type check on readIntBig! + const data = if (len > 0) buf[(pos+4)..(pos+4+len)] else &[_]u8{}; + columns[col] = data; + pos += (4+len); + } + res.columns = columns; + return res; +} + +pub fn write(self: DataRow, a: std.mem.Allocator, stream_writer: anytype) !void { + try stream_writer.writeByte(Tag); + var al = ByteArrayList.init(a); + defer al.deinit(); + var cw = std.io.countingWriter(al.writer()); + var writer = cw.writer(); + try writer.writeIntBig(u32, 0); // length placeholder + try writer.writeIntBig(u16, @as(u16, @intCast(self.columns.len))); + for (self.columns) |column| { + const len = @as(u32, @intCast(column.len)); + try writer.writeIntBig(u32, len); + try writer.writeAll(column); + } + // Fixup the length and write to the original stream + std.mem.writeIntBig(u32, al.items[0..4], @as(u32, @intCast(cw.bytes_written))); + try stream_writer.writeAll(al.items); +} + +pub fn deinit(self: *DataRow, a: std.mem.Allocator) void { + if (self.buf != null) a.free(self.buf.?); + a.free(self.columns); +} + +test "round trip" { + const allocator = std.testing.allocator; + const columns = try allocator.alloc([]const u8, 3); + columns[0] = "Hello"; + columns[1] = "FooBar"; + columns[2] = ""; + var sm = DataRow{ + .columns = columns, + }; + defer sm.deinit(allocator); + + var bal = ByteArrayList.init(allocator); + defer bal.deinit(); + try sm.write(allocator, bal.writer()); + + var fbs = std.io.fixedBufferStream(bal.items); + var reader = fbs.reader(); + const tag = try reader.readByte(); + try std.testing.expectEqual(Tag, tag); + const len = try reader.readIntBig(u32); + const buf = try allocator.alloc(u8, len - 4); + defer allocator.free(buf); + try reader.readNoEof(buf); + var sm2 = try DataRow.read(allocator, buf); + defer sm2.deinit(allocator); + + try std.testing.expectEqualStrings("Hello", sm2.columns[0]); + try std.testing.expectEqualStrings("FooBar", sm2.columns[1]); + try std.testing.expectEqualStrings("", sm2.columns[2]); +} diff --git a/src/main.zig b/src/main.zig index 86e1983..07b6628 100644 --- a/src/main.zig +++ b/src/main.zig @@ -8,6 +8,7 @@ const ReadyForQuery = @import("ready_for_query.zig"); const ParameterStatus = @import("parameter_status.zig"); const BackendKeyData = @import("backend_key_data.zig"); const Query = @import("query.zig"); +const DataRow = @import("data_row.zig"); const Conn = @import("conn.zig"); pub const ProtocolError = error{ @@ -117,4 +118,5 @@ test { _ = ParameterStatus; _ = BackendKeyData; _ = Query; + _ = DataRow; } |