From 0fe9a86daf043c338b46d215b8f2457f64fa0446 Mon Sep 17 00:00:00 2001 From: Martin Ashby Date: Thu, 28 Sep 2023 09:25:31 +0100 Subject: Start adding Conn#receiveMessage --- src/conn/conn.zig | 64 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/src/conn/conn.zig b/src/conn/conn.zig index d97eca2..037868e 100644 --- a/src/conn/conn.zig +++ b/src/conn/conn.zig @@ -22,6 +22,7 @@ const ConnStatus = enum { connStatusBusy, }; +allocator: std.mem.Allocator, stream: std.net.Stream, config: Config, status: ConnStatus = .connStatusUninitialized, @@ -94,6 +95,35 @@ pub fn connect(config: Config) !Conn { return res; } +fn receiveMessage(self: *Conn) !BackendMessage { + var anymsg = try read_message(self., reader); + defer anymsg.deinit(allocator); + switch (anymsg) { + .ReadyForQuery => + pgConn.txStatus = msg.TxStatus + case *pgproto3.ParameterStatus: + pgConn.parameterStatuses[msg.Name] = msg.Value + case *pgproto3.ErrorResponse: + if msg.Severity == "FATAL" { + pgConn.status = connStatusClosed + pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. + close(pgConn.cleanupDone) + return nil, ErrorResponseToPgError(msg) + } + case *pgproto3.NoticeResponse: + if pgConn.config.OnNotice != nil { + pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg)) + } + case *pgproto3.NotificationResponse: + if pgConn.config.OnNotification != nil { + pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload}) + } + } + + return msg, nil +} + + pub fn deinit(self: *Conn) void { self.stream.close(); } @@ -101,24 +131,24 @@ pub fn deinit(self: *Conn) void { // How to handle this ... // The Go code relies on polymorphism to generically read any message type. // I _could_ have a tagged union type thing -// pub const ResultIterator = struct { -// conn: *Conn, -// command_concluded: bool = false, -// // NextRow advances the ResultReader to the next row and returns true if a row is available. -// pub fn next_row(self: *ResultIterator) bool { -// // TODO implement -// var reader = self.conn.stream.reader(); -// switch (try reader.readByte()) { -// case -// } -// return false; -// } -// }; +pub const ResultIterator = struct { + conn: *Conn, + command_concluded: bool = false, + // NextRow advances the ResultReader to the next row and returns true if a row is available. + pub fn next_row(self: *ResultIterator) bool { + // TODO implement + var reader = self.conn.stream.reader(); + switch (try reader.readByte()) { + case + } + return false; + } +}; -// pub const MultiResultIterator = struct { -// conn: *Conn, -// fn next() ? -// }; +pub const MultiResultIterator = struct { + conn: *Conn, + fn next() ? +}; // pub fn exec(self: *Conn) { -- cgit v1.2.3-ZIG