aboutsummaryrefslogtreecommitdiff
path: root/src/conn
diff options
context:
space:
mode:
authorMartin Ashby <martin@ashbysoft.com>2023-09-28 09:25:31 +0100
committerMartin Ashby <martin@ashbysoft.com>2023-09-28 09:25:31 +0100
commit0fe9a86daf043c338b46d215b8f2457f64fa0446 (patch)
tree07a485d18030b0ba06ddb2f0e8933a4e917a2747 /src/conn
parent747c6e55cbe2283fd85ef8cd930e88d2bb0b7db2 (diff)
downloadpgz-0fe9a86daf043c338b46d215b8f2457f64fa0446.tar.gz
pgz-0fe9a86daf043c338b46d215b8f2457f64fa0446.tar.bz2
pgz-0fe9a86daf043c338b46d215b8f2457f64fa0446.tar.xz
pgz-0fe9a86daf043c338b46d215b8f2457f64fa0446.zip
Start adding Conn#receiveMessage
Diffstat (limited to 'src/conn')
-rw-r--r--src/conn/conn.zig64
1 files changed, 47 insertions, 17 deletions
diff --git a/src/conn/conn.zig b/src/conn/conn.zig
index d97eca2..037868e 100644
--- a/src/conn/conn.zig
+++ b/src/conn/conn.zig
@@ -22,6 +22,7 @@ const ConnStatus = enum {
connStatusBusy,
};
+allocator: std.mem.Allocator,
stream: std.net.Stream,
config: Config,
status: ConnStatus = .connStatusUninitialized,
@@ -94,6 +95,35 @@ pub fn connect(config: Config) !Conn {
return res;
}
+fn receiveMessage(self: *Conn) !BackendMessage {
+ var anymsg = try read_message(self., reader);
+ defer anymsg.deinit(allocator);
+ switch (anymsg) {
+ .ReadyForQuery =>
+ pgConn.txStatus = msg.TxStatus
+ case *pgproto3.ParameterStatus:
+ pgConn.parameterStatuses[msg.Name] = msg.Value
+ case *pgproto3.ErrorResponse:
+ if msg.Severity == "FATAL" {
+ pgConn.status = connStatusClosed
+ pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
+ close(pgConn.cleanupDone)
+ return nil, ErrorResponseToPgError(msg)
+ }
+ case *pgproto3.NoticeResponse:
+ if pgConn.config.OnNotice != nil {
+ pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg))
+ }
+ case *pgproto3.NotificationResponse:
+ if pgConn.config.OnNotification != nil {
+ pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload})
+ }
+ }
+
+ return msg, nil
+}
+
+
pub fn deinit(self: *Conn) void {
self.stream.close();
}
@@ -101,24 +131,24 @@ pub fn deinit(self: *Conn) void {
// How to handle this ...
// The Go code relies on polymorphism to generically read any message type.
// I _could_ have a tagged union type thing
-// pub const ResultIterator = struct {
-// conn: *Conn,
-// command_concluded: bool = false,
-// // NextRow advances the ResultReader to the next row and returns true if a row is available.
-// pub fn next_row(self: *ResultIterator) bool {
-// // TODO implement
-// var reader = self.conn.stream.reader();
-// switch (try reader.readByte()) {
-// case
-// }
-// return false;
-// }
-// };
+pub const ResultIterator = struct {
+ conn: *Conn,
+ command_concluded: bool = false,
+ // NextRow advances the ResultReader to the next row and returns true if a row is available.
+ pub fn next_row(self: *ResultIterator) bool {
+ // TODO implement
+ var reader = self.conn.stream.reader();
+ switch (try reader.readByte()) {
+ case
+ }
+ return false;
+ }
+};
-// pub const MultiResultIterator = struct {
-// conn: *Conn,
-// fn next() ?
-// };
+pub const MultiResultIterator = struct {
+ conn: *Conn,
+ fn next() ?
+};
// pub fn exec(self: *Conn) {