aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ashby <martin@ashbysoft.com>2023-09-28 10:54:58 +0100
committerMartin Ashby <martin@ashbysoft.com>2023-09-28 10:54:58 +0100
commit35494bc81b59165ee9264cd1004bb05a120279a3 (patch)
tree8d11946f8fc20e1f254301100e860f28bade5ee0
parent0c063d42430077881e563120ebfcf92c2cecf463 (diff)
downloadpgz-35494bc81b59165ee9264cd1004bb05a120279a3.tar.gz
pgz-35494bc81b59165ee9264cd1004bb05a120279a3.tar.bz2
pgz-35494bc81b59165ee9264cd1004bb05a120279a3.tar.xz
pgz-35494bc81b59165ee9264cd1004bb05a120279a3.zip
Reduce allocations, the message takes ownership of the bytes read from
the stream and is responsible for deallocating, rather than copying them to their own storage.
-rw-r--r--src/proto/authentication_request.zig11
-rw-r--r--src/proto/backend_key_data.zig10
-rw-r--r--src/proto/command_complete.zig13
-rw-r--r--src/proto/copy_x_response.zig6
-rw-r--r--src/proto/data_row.zig23
-rw-r--r--src/proto/error_response.zig5
-rw-r--r--src/proto/parameter_status.zig18
-rw-r--r--src/proto/proto.zig2
-rw-r--r--src/proto/ready_for_query.zig9
-rw-r--r--src/proto/row_description.zig35
10 files changed, 64 insertions, 68 deletions
diff --git a/src/proto/authentication_request.zig b/src/proto/authentication_request.zig
index 3ea5cd1..c4bddb5 100644
--- a/src/proto/authentication_request.zig
+++ b/src/proto/authentication_request.zig
@@ -25,12 +25,14 @@ pub const AuthRequestCleartextPassword = struct {};
inner_type: InnerAuthRequestType,
inner: InnerAuthRequest,
-pub fn read(_: std.mem.Allocator, b: []const u8) !AuthenticationRequest {
- if (b.len != 4) {
- log.err("invalid message length, expected 4 got {}", .{b.len});
+// takes ownership of b
+pub fn read(a: std.mem.Allocator, buf: []const u8) !AuthenticationRequest {
+ defer a.free(buf); // No need to retain it we copy all the interesting data out
+ if (buf.len != 4) {
+ log.err("invalid message length, expected 4 got {}", .{buf.len});
return ProtocolError.InvalidMessageLength;
}
- const auth_type_int = std.mem.readIntBig(u32, b[0..4]);
+ const auth_type_int = std.mem.readIntBig(u32, buf[0..4]);
const inner_type = enum_from_int(InnerAuthRequestType, auth_type_int) orelse {
log.err("Unsupported auth type {}", .{auth_type_int});
return ClientError.UnsupportedAuthType;
@@ -71,7 +73,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try AuthenticationRequest.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/backend_key_data.zig b/src/proto/backend_key_data.zig
index 7c32178..4e7f30d 100644
--- a/src/proto/backend_key_data.zig
+++ b/src/proto/backend_key_data.zig
@@ -10,11 +10,12 @@ pub const Tag: u8 = 'K';
process_id: u32,
secret_key: u32,
-pub fn read(_: std.mem.Allocator, b: []const u8) !BackendKeyData {
- if (b.len != 8) return ProtocolError.InvalidMessageLength;
+pub fn read(a: std.mem.Allocator, buf: []const u8) !BackendKeyData {
+ defer a.free(buf);
+ if (buf.len != 8) return ProtocolError.InvalidMessageLength;
return .{
- .process_id = std.mem.readIntBig(u32, b[0..4]),
- .secret_key = std.mem.readIntBig(u32, b[4..8]),
+ .process_id = std.mem.readIntBig(u32, buf[0..4]),
+ .secret_key = std.mem.readIntBig(u32, buf[4..8]),
};
}
pub fn write(self: BackendKeyData, _: std.mem.Allocator, stream_writer: anytype) !void {
@@ -43,7 +44,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try BackendKeyData.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/command_complete.zig b/src/proto/command_complete.zig
index 80014e9..ed8e052 100644
--- a/src/proto/command_complete.zig
+++ b/src/proto/command_complete.zig
@@ -10,24 +10,24 @@ pub const Tag: u8 = 'C';
const CommandComplete = @This();
+buf: ?[]const u8 = null, // owned
command_tag: []const u8,
-owned: bool = false,
-pub fn read(a: std.mem.Allocator, b: []const u8) !CommandComplete {
+pub fn read(_: std.mem.Allocator, buf: []const u8) !CommandComplete {
return .{
- .command_tag = try a.dupe(u8, b),
- .owned = true,
+ .buf = buf,
+ .command_tag = buf[0..],
};
}
pub fn write(self: CommandComplete, _: std.mem.Allocator, stream_writer: anytype) !void {
try stream_writer.writeByte(Tag);
- try stream_writer.writeIntBig(u32, @as(u32, @intCast(4+self.command_tag.len)));
+ try stream_writer.writeIntBig(u32, @as(u32, @intCast(4 + self.command_tag.len)));
try stream_writer.writeAll(self.command_tag);
}
pub fn deinit(self: *CommandComplete, a: std.mem.Allocator) void {
- if (self.owned) a.free(self.command_tag);
+ if (self.buf != null) a.free(self.buf.?);
}
test "round trip" {
@@ -47,7 +47,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try CommandComplete.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/copy_x_response.zig b/src/proto/copy_x_response.zig
index 9d1b26c..a4370a7 100644
--- a/src/proto/copy_x_response.zig
+++ b/src/proto/copy_x_response.zig
@@ -13,8 +13,9 @@ pub fn CopyXResponse(comptime tag: u8) type {
overall_format_code: u8,
format_codes: []const FormatCode, // owned
- pub fn read(a: std.mem.Allocator, b: []const u8) !@This() {
- var fbs = std.io.fixedBufferStream(b);
+ pub fn read(a: std.mem.Allocator, buf: []const u8) !@This() {
+ defer a.free(buf);
+ var fbs = std.io.fixedBufferStream(buf);
var reader = fbs.reader();
const overall_format_code = try reader.readIntBig(u8);
const n_columns = try reader.readIntBig(u16);
@@ -76,7 +77,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try CopyInResponse.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/data_row.zig b/src/proto/data_row.zig
index c20b794..43c4526 100644
--- a/src/proto/data_row.zig
+++ b/src/proto/data_row.zig
@@ -12,25 +12,23 @@ const DataRow = @This();
buf: ?[]const u8 = null, // owned
columns: [][]const u8, // also owned
-pub fn read(a: std.mem.Allocator, b: []const u8) !DataRow {
- if (b.len < 2) return ProtocolError.InvalidMessageLength;
- var buf = try a.dupe(u8, b);
- var res: DataRow = undefined;
- res.buf = buf;
- errdefer res.deinit(a);
-
+pub fn read(a: std.mem.Allocator, buf: []const u8) !DataRow {
+ if (buf.len < 2) return ProtocolError.InvalidMessageLength;
+ errdefer a.free(buf);
const n_columns = std.mem.readIntBig(u16, buf[0..2]);
const columns = try a.alloc([]const u8, n_columns);
errdefer a.free(columns);
var pos: usize = 2;
for (0..n_columns) |col| {
- const len = std.mem.readIntBig(u32, buf[pos..(pos+4)][0..4]); // second slice forces the slice size to be known at comptime and satisfy the type check on readIntBig!
- const data = if (len > 0) buf[(pos+4)..(pos+4+len)] else &[_]u8{};
+ const len = std.mem.readIntBig(u32, buf[pos..(pos + 4)][0..4]); // second slice forces the slice size to be known at comptime and satisfy the type check on readIntBig!
+ const data = if (len > 0) buf[(pos + 4)..(pos + 4 + len)] else &[_]u8{};
columns[col] = data;
- pos += (4+len);
+ pos += (4 + len);
}
- res.columns = columns;
- return res;
+ return .{
+ .buf = buf,
+ .columns = columns,
+ };
}
pub fn write(self: DataRow, a: std.mem.Allocator, stream_writer: anytype) !void {
@@ -77,7 +75,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try DataRow.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/error_response.zig b/src/proto/error_response.zig
index dc75053..58ca06e 100644
--- a/src/proto/error_response.zig
+++ b/src/proto/error_response.zig
@@ -27,13 +27,13 @@ line: ?u32 = null,
routine: ?[]const u8 = null,
unknown_fields: HMByteString,
-pub fn read(allocator: std.mem.Allocator, b: []const u8) !ErrorResponse {
+pub fn read(allocator: std.mem.Allocator, buf: []const u8) !ErrorResponse {
var res = ErrorResponse{
.severity = "",
.code = "",
.message = "",
.unknown_fields = HMByteString.init(allocator),
- .buf = try allocator.dupe(u8, b),
+ .buf = buf,
};
errdefer res.deinit(allocator);
var it = std.mem.splitScalar(u8, res.buf.?, 0);
@@ -178,7 +178,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try ErrorResponse.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/parameter_status.zig b/src/proto/parameter_status.zig
index 5f95695..7bc306b 100644
--- a/src/proto/parameter_status.zig
+++ b/src/proto/parameter_status.zig
@@ -11,13 +11,14 @@ buf: ?[]const u8 = null, // owned
name: []const u8,
value: []const u8,
-pub fn read(allocator: std.mem.Allocator, b: []const u8) !ParameterStatus {
- var res: ParameterStatus = undefined;
- res.buf = try allocator.dupe(u8, b);
- var it = std.mem.splitScalar(u8, res.buf.?, 0);
- res.name = it.first();
- res.value = it.next() orelse return ProtocolError.MissingField;
- return res;
+pub fn read(a: std.mem.Allocator, buf: []const u8) !ParameterStatus {
+ errdefer a.free(buf);
+ var it = std.mem.splitScalar(u8, buf, 0);
+ return .{
+ .buf = buf,
+ .name = it.first(),
+ .value = it.next() orelse return ProtocolError.MissingField,
+ };
}
pub fn write(self: ParameterStatus, a: std.mem.Allocator, stream_writer: anytype) !void {
try stream_writer.writeByte(Tag);
@@ -30,7 +31,7 @@ pub fn write(self: ParameterStatus, a: std.mem.Allocator, stream_writer: anytype
try writer.writeByte(0);
try writer.writeAll(self.value);
try writer.writeByte(0);
- std.mem.writeIntBig(u32, al.items[0..4], @as(u32,@intCast(cw.bytes_written))); // Fix length
+ std.mem.writeIntBig(u32, al.items[0..4], @as(u32, @intCast(cw.bytes_written))); // Fix length
try stream_writer.writeAll(al.items);
}
pub fn deinit(self: *ParameterStatus, allocator: std.mem.Allocator) void {
@@ -55,7 +56,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try ParameterStatus.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/proto.zig b/src/proto/proto.zig
index 5e4489d..9025347 100644
--- a/src/proto/proto.zig
+++ b/src/proto/proto.zig
@@ -51,13 +51,13 @@ pub fn read_message(allocator: std.mem.Allocator, stream_reader: anytype) !Backe
const tag = try stream_reader.readByte();
const len = try stream_reader.readIntBig(u32);
const buf = try allocator.alloc(u8, @as(u32, @intCast(len - 4)));
- defer allocator.free(buf);
try stream_reader.readNoEof(buf);
inline for (@typeInfo(BackendMessage).Union.fields) |field| {
if (field.type.Tag == tag) {
return @unionInit(BackendMessage, field.name, try field.type.read(allocator, buf));
}
} else {
+ allocator.free(buf);
return ProtocolError.InvalidMessageType;
}
}
diff --git a/src/proto/ready_for_query.zig b/src/proto/ready_for_query.zig
index ef99e60..6bf25f9 100644
--- a/src/proto/ready_for_query.zig
+++ b/src/proto/ready_for_query.zig
@@ -14,10 +14,10 @@ const TransactionStatus = enum(u8) {
transaction_status: TransactionStatus,
-pub fn read(allocator: std.mem.Allocator, b: []const u8) !ReadyForQuery {
- _ = allocator;
- if (b.len != 1) return ProtocolError.InvalidMessageLength;
- return .{ .transaction_status = enum_from_int(TransactionStatus, b[0]) orelse return ProtocolError.InvalidTransactionStatus };
+pub fn read(a: std.mem.Allocator, buf: []const u8) !ReadyForQuery {
+ defer a.free(buf);
+ if (buf.len != 1) return ProtocolError.InvalidMessageLength;
+ return .{ .transaction_status = enum_from_int(TransactionStatus, buf[0]) orelse return ProtocolError.InvalidTransactionStatus };
}
pub fn write(self: ReadyForQuery, allocator: std.mem.Allocator, stream_writer: anytype) !void {
_ = allocator;
@@ -44,7 +44,6 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try ReadyForQuery.read(allocator, buf);
defer sm2.deinit(allocator);
diff --git a/src/proto/row_description.zig b/src/proto/row_description.zig
index b8105e2..ff17716 100644
--- a/src/proto/row_description.zig
+++ b/src/proto/row_description.zig
@@ -11,7 +11,7 @@ pub const Tag: u8 = 'T';
const RowDescription = @This();
buf: ?[]const u8 = null, // owned
-fields: ?[]Field = null, // owned
+fields: []Field, // owned
pub const Field = struct {
name: []const u8,
@@ -23,19 +23,18 @@ pub const Field = struct {
format_code: FormatCode,
};
-pub fn read(a: std.mem.Allocator, b: []const u8) !RowDescription {
- var res: RowDescription = undefined;
- res.buf = try a.dupe(u8, b);
- errdefer res.deinit(a);
- var fbs = std.io.fixedBufferStream(res.buf.?);
+pub fn read(a: std.mem.Allocator, buf: []const u8) !RowDescription {
+ errdefer a.free(buf);
+ var fbs = std.io.fixedBufferStream(buf);
var reader = fbs.reader();
const n_fields = try reader.readIntBig(u16);
- res.fields = try a.alloc(Field, n_fields);
+ var fields = try a.alloc(Field, n_fields);
+ errdefer a.free(fields);
for (0..n_fields) |i| {
const name_start = fbs.pos;
try reader.skipUntilDelimiterOrEof(0);
const name_end = fbs.pos - 1;
- const name = res.buf.?[name_start..name_end];
+ const name = buf[name_start..name_end];
const field = Field{
.name = name,
.table_oid = try reader.readIntBig(u32),
@@ -45,9 +44,12 @@ pub fn read(a: std.mem.Allocator, b: []const u8) !RowDescription {
.data_type_modifier = try reader.readIntBig(u32),
.format_code = enum_from_int(FormatCode, try reader.readIntBig(u16)) orelse return ProtocolError.InvalidFormatCode,
};
- res.fields.?[i] = field;
+ fields[i] = field;
}
- return res;
+ return .{
+ .buf = buf,
+ .fields = fields,
+ };
}
pub fn write(self: RowDescription, a: std.mem.Allocator, stream_writer: anytype) !void {
@@ -57,8 +59,8 @@ pub fn write(self: RowDescription, a: std.mem.Allocator, stream_writer: anytype)
var cw = std.io.countingWriter(al.writer());
var writer = cw.writer();
try writer.writeIntBig(u32, 0); // length placeholder
- try writer.writeIntBig(u16, @as(u16, @intCast(self.fields.?.len)));
- for (self.fields.?) |field| {
+ try writer.writeIntBig(u16, @as(u16, @intCast(self.fields.len)));
+ for (self.fields) |field| {
try writer.writeAll(field.name);
try writer.writeByte(0);
try writer.writeIntBig(u32, field.table_oid);
@@ -73,7 +75,7 @@ pub fn write(self: RowDescription, a: std.mem.Allocator, stream_writer: anytype)
}
pub fn deinit(self: *RowDescription, a: std.mem.Allocator) void {
- if (self.fields != null) a.free(self.fields.?);
+ a.free(self.fields);
if (self.buf != null) a.free(self.buf.?);
}
@@ -125,12 +127,11 @@ test "round trip" {
try std.testing.expectEqual(Tag, tag);
const len = try reader.readIntBig(u32);
const buf = try allocator.alloc(u8, len - 4);
- defer allocator.free(buf);
try reader.readNoEof(buf);
var sm2 = try RowDescription.read(allocator, buf);
defer sm2.deinit(allocator);
- try std.testing.expectEqualDeep(f0, sm2.fields.?[0]);
- try std.testing.expectEqualDeep(f1, sm2.fields.?[1]);
- try std.testing.expectEqualDeep(f2, sm2.fields.?[2]);
+ try std.testing.expectEqualDeep(f0, sm2.fields[0]);
+ try std.testing.expectEqualDeep(f1, sm2.fields[1]);
+ try std.testing.expectEqualDeep(f2, sm2.fields[2]);
}