aboutsummaryrefslogtreecommitdiff
path: root/src/conn/conn.zig
diff options
context:
space:
mode:
authorMartin Ashby <martin@ashbysoft.com>2023-09-28 10:38:25 +0100
committerMartin Ashby <martin@ashbysoft.com>2023-09-28 10:38:25 +0100
commit0c063d42430077881e563120ebfcf92c2cecf463 (patch)
treef4cec6018558f058051bc411c59ee0ae818e46d4 /src/conn/conn.zig
parent0fe9a86daf043c338b46d215b8f2457f64fa0446 (diff)
downloadpgz-0c063d42430077881e563120ebfcf92c2cecf463.tar.gz
pgz-0c063d42430077881e563120ebfcf92c2cecf463.tar.bz2
pgz-0c063d42430077881e563120ebfcf92c2cecf463.tar.xz
pgz-0c063d42430077881e563120ebfcf92c2cecf463.zip
More work on conn.zig
Diffstat (limited to 'src/conn/conn.zig')
-rw-r--r--src/conn/conn.zig120
1 files changed, 55 insertions, 65 deletions
diff --git a/src/conn/conn.zig b/src/conn/conn.zig
index 037868e..a5a5ac6 100644
--- a/src/conn/conn.zig
+++ b/src/conn/conn.zig
@@ -34,6 +34,7 @@ pub fn connect(config: Config) !Conn {
.unix => |path| try std.net.connectUnixSocket(path),
};
var res = Conn{
+ .allocator = allocator,
.stream = stream,
.config = config,
};
@@ -41,6 +42,7 @@ pub fn connect(config: Config) !Conn {
var writer = stream.writer();
var dr = diagnosticReader(100, stream.reader());
var reader = dr.reader();
+ _ = reader;
var params = SSHashMap.init(allocator);
try params.put("user", config.user);
if (config.database) |database| try params.put("database", database);
@@ -50,16 +52,15 @@ pub fn connect(config: Config) !Conn {
defer sm.deinit(allocator);
try sm.write(allocator, writer);
lp: while (true) {
- var anymsg = try read_message(allocator, reader);
+ var anymsg = try res.receiveMessage();
defer anymsg.deinit(allocator);
switch (anymsg) {
- .ErrorResponse => |err| {
- log.err("Error connecting to server {any}", .{err});
- return ServerError.ErrorResponse;
+ .ReadyForQuery => {
+ break :lp;
},
.AuthenticationRequest => |ar| {
switch (ar.inner_type) {
- .AuthRequestTypeOk => {}, // fine do nothing!
+ .AuthRequestTypeOk => {},
.AuthRequestTypeCleartextPassword => {
if (config.password) |password| {
const pm = PasswordMessage{ .password = password };
@@ -70,60 +71,49 @@ pub fn connect(config: Config) !Conn {
},
}
},
- .ReadyForQuery => |rfq| {
- // TODO do something about transaction state?
- res.status = .connStatusIdle;
- log.info("ready for query {any}", .{rfq});
- break :lp;
- },
- .ParameterStatus => |ps| {
- // TODO Handle this somehow?
- log.info("ParameterStatus: {s}:{s}", .{ ps.name, ps.value });
- },
- .BackendKeyData => |bkd| {
- log.info("BackendKeyData process_id {} secret_key {}", .{ bkd.process_id, bkd.secret_key });
- },
- else => |response_type| {
- log.err("unhandled message type [{}]", .{response_type});
- const diag = try dr.get(allocator);
- defer allocator.free(diag);
- log.err("diag [{s}]", .{diag});
- return ProtocolError.WrongMessageType;
+ else => {
+ // deliberately do nothing, we must wait for ready before the connection can be used.
},
}
}
return res;
}
+// Messages should always be received through this function.
+// this'll handle generic stuff that should happen on the connection
fn receiveMessage(self: *Conn) !BackendMessage {
- var anymsg = try read_message(self., reader);
- defer anymsg.deinit(allocator);
+ var anymsg = try read_message(self.allocator, self.stream.reader());
+ errdefer anymsg.deinit(self.allocator);
switch (anymsg) {
- .ReadyForQuery =>
- pgConn.txStatus = msg.TxStatus
- case *pgproto3.ParameterStatus:
- pgConn.parameterStatuses[msg.Name] = msg.Value
- case *pgproto3.ErrorResponse:
- if msg.Severity == "FATAL" {
- pgConn.status = connStatusClosed
- pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
- close(pgConn.cleanupDone)
- return nil, ErrorResponseToPgError(msg)
- }
- case *pgproto3.NoticeResponse:
- if pgConn.config.OnNotice != nil {
- pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg))
- }
- case *pgproto3.NotificationResponse:
- if pgConn.config.OnNotification != nil {
- pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload})
- }
+ .ReadyForQuery => {
+ // TODO handle TxStatus
+ },
+ .ParameterStatus => {
+ // TODO handle parameter status
+ },
+ .ErrorResponse => |err| {
+ if (std.mem.eql(u8, err.severity, "FATAL")) {
+ self.status = .connStatusClosed;
+ // TODO close the connection here? But it should really be the caller's responsiblilty
+ return ServerError.ErrorResponse;
+ }
+ },
+ // .NoticeResponse => {
+ // // TODO handle notice response
+ // },
+ // .NotificationResponse => {
+ // // TODO handle notificationResponse
+ // },
+ .BackendKeyData => {
+ // TODO handle backend key data
+ },
+ else => {
+ // deliberately do nothing, caller can presumably handle them.
+ },
}
-
- return msg, nil
+ return anymsg;
}
-
pub fn deinit(self: *Conn) void {
self.stream.close();
}
@@ -131,24 +121,24 @@ pub fn deinit(self: *Conn) void {
// How to handle this ...
// The Go code relies on polymorphism to generically read any message type.
// I _could_ have a tagged union type thing
-pub const ResultIterator = struct {
- conn: *Conn,
- command_concluded: bool = false,
- // NextRow advances the ResultReader to the next row and returns true if a row is available.
- pub fn next_row(self: *ResultIterator) bool {
- // TODO implement
- var reader = self.conn.stream.reader();
- switch (try reader.readByte()) {
- case
- }
- return false;
- }
-};
+// pub const ResultIterator = struct {
+// conn: *Conn,
+// command_concluded: bool = false,
+// // NextRow advances the ResultReader to the next row and returns true if a row is available.
+// pub fn next_row(self: *ResultIterator) bool {
+// // TODO implement
+// var reader = self.conn.stream.reader();
+// switch (try reader.readByte()) {
+// case
+// }
+// return false;
+// }
+// };
-pub const MultiResultIterator = struct {
- conn: *Conn,
- fn next() ?
-};
+// pub const MultiResultIterator = struct {
+// conn: *Conn,
+// fn next() ?
+// };
// pub fn exec(self: *Conn) {