aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ashby <martin@ashbysoft.com>2023-09-28 20:46:10 +0100
committerMartin Ashby <martin@ashbysoft.com>2023-09-28 20:46:10 +0100
commitc00a7cd57be154b5a770a397319c8c8ad35c98b6 (patch)
tree89ef0f7bef322574883260c8952e0f3f71fa40ff
parentd1435847fa68a548f8a2b8c61857691c6f5ac924 (diff)
downloadpgz-c00a7cd57be154b5a770a397319c8c8ad35c98b6.tar.gz
pgz-c00a7cd57be154b5a770a397319c8c8ad35c98b6.tar.bz2
pgz-c00a7cd57be154b5a770a397319c8c8ad35c98b6.tar.xz
pgz-c00a7cd57be154b5a770a397319c8c8ad35c98b6.zip
WIP
-rw-r--r--src/conn/conn.zig72
-rw-r--r--src/proto/command_complete.zig7
-rw-r--r--src/proto/data_row.zig8
-rw-r--r--src/proto/row_description.zig8
4 files changed, 68 insertions, 27 deletions
diff --git a/src/conn/conn.zig b/src/conn/conn.zig
index fbde06c..db5084c 100644
--- a/src/conn/conn.zig
+++ b/src/conn/conn.zig
@@ -125,56 +125,74 @@ pub const ResultIterator = struct {
row_description: ?proto.RowDescription = null,
current_datarow: ?proto.DataRow = null,
command_complete: ?proto.CommandComplete = null,
+ pub fn init(conn: *Conn) ResultIterator {
+ return .{
+ .conn = conn,
+ };
+ }
pub fn deinit(self: *ResultIterator) void {
if (self.row_description != null) self.row_description.?.deinit(self.conn.allocator);
if (self.current_datarow != null) self.current_datarow.?.deinit(self.conn.allocator);
if (self.command_complete != null) self.command_complete.?.deinit(self.conn.allocator);
}
- // NextRow advances the ResultIterator to the next row and returns true if a row is available.
- pub fn next_row(self: *ResultIterator) !bool {
+ // NextRow advances the ResultIterator to the next row and returns a row if one is available.
+ // or null if we've reached the end of the reuslt.
+ pub fn next_row(self: *ResultIterator) !?[][]const u8 {
while (self.command_complete == null) {
- var msg = try self.conn.receive_message();
+ var msg = try self.receive_message();
switch (msg) {
- .DataRow => |dr| {
- if (self.row_description != null) self.row_description.?.deinit(self.conn.allocator);
- self.current_datarow = dr;
- return true;
- },
- .RowDescription => |rd| {
- if (self.row_description != null) return ProtocolError.UnexpectedMessage;
- self.row_description = rd;
- },
- .CommandComplete => |cc| {
- if (self.command_complete != null) return ProtocolError.UnexpectedMessage;
- self.command_complete = cc;
+ .DataRow => {
+ return self.current_datarow.?.columns;
},
else => {
msg.deinit(self.conn.allocator);
},
}
}
- return false;
+ return null;
+ }
+
+ pub fn skip_to_end(self: *ResultIterator) !void {
+ while (self.command_complete == null) {
+ _ = try self.receive_message();
+ }
}
- // row returns the current row data
- pub fn row(self: ResultIterator) ?[][]const u8 {
- return if (self.current_datarow) |dr| dr.columns else null;
+
+ fn receive_message(self: *ResultIterator) !BackendMessage {
+ var msg = if (self.multi_iterator == null) try self.conn.receive_message() else try self.multi_iterator.?.receive_message();
+ switch (msg) {
+ .DataRow => |dr| {
+ if (self.current_datarow != null) self.current_datarow.?.deinit(self.conn.allocator);
+ self.current_datarow = try dr.clone(self.conn.allocator);
+ },
+ .RowDescription => |rd| {
+ if (self.row_description != null) return ProtocolError.UnexpectedMessage;
+ self.row_description = try rd.clone(self.conn.allocator);
+ },
+ .CommandComplete => |cc| {
+ if (self.command_complete != null) return ProtocolError.UnexpectedMessage;
+ self.command_complete = try cc.clone(self.conn.allocator);
+ },
+ }
+ return msg;
}
};
pub const MultiResultIterator = struct {
conn: *Conn,
- cri: ?*ResultIterator = null,
+ cri: ?*ResultIterator,
- pub fn next_result(self: *MultiResultIterator) !bool {
- if ()
+ // returns the next result iterator, or null if we've reached the end of the results
+ pub fn next_result(self: *MultiResultIterator) !?*ResultIterator {
+ if (self.cri != null) {
+ try self.cri.?.skip_to_end();
+ }
}
- pub fn result(self: MultiResultIterator) ?*ResultIterator {
- return self.cri;
- }
fn receive_message(self: *MultiResultIterator) !BackendMessage {
- _ = self;
- return error.NotImplemented;
+ var msg = try self.conn.receive_message();
+ switch (msg) {}
+ return msg;
}
};
diff --git a/src/proto/command_complete.zig b/src/proto/command_complete.zig
index ed8e052..f9a9e26 100644
--- a/src/proto/command_complete.zig
+++ b/src/proto/command_complete.zig
@@ -30,6 +30,13 @@ pub fn deinit(self: *CommandComplete, a: std.mem.Allocator) void {
if (self.buf != null) a.free(self.buf.?);
}
+pub fn clone(self: CommandComplete, a: std.mem.Allocator) !CommandComplete {
+ var ba = ByteArrayList.init(a);
+ errdefer ba.deinit();
+ try self.write(a, ba.writer());
+ return try CommandComplete.read(a, ba.items);
+}
+
test "round trip" {
const allocator = std.testing.allocator;
var sm = CommandComplete{
diff --git a/src/proto/data_row.zig b/src/proto/data_row.zig
index 43c4526..6bfcc1d 100644
--- a/src/proto/data_row.zig
+++ b/src/proto/data_row.zig
@@ -54,6 +54,14 @@ pub fn deinit(self: *DataRow, a: std.mem.Allocator) void {
a.free(self.columns);
}
+// Caller owns the new DataRow.
+pub fn clone(self: DataRow, a: std.mem.Allocator) !DataRow {
+ var ba = ByteArrayList.init(a);
+ errdefer ba.deinit();
+ try self.write(a, ba.writer());
+ return try DataRow.read(a, ba.items);
+}
+
test "round trip" {
const allocator = std.testing.allocator;
const columns = try allocator.alloc([]const u8, 3);
diff --git a/src/proto/row_description.zig b/src/proto/row_description.zig
index ff17716..50e4cb0 100644
--- a/src/proto/row_description.zig
+++ b/src/proto/row_description.zig
@@ -74,6 +74,14 @@ pub fn write(self: RowDescription, a: std.mem.Allocator, stream_writer: anytype)
try stream_writer.writeAll(al.items);
}
+// Caller owns the result.
+pub fn clone(self: RowDescription, a: std.mem.Allocator) !RowDescription {
+ var ba = ByteArrayList.init(a);
+ errdefer ba.deinit();
+ try self.write(a, ba.writer());
+ return try RowDescription.read(a, ba.items);
+}
+
pub fn deinit(self: *RowDescription, a: std.mem.Allocator) void {
a.free(self.fields);
if (self.buf != null) a.free(self.buf.?);