From 0c063d42430077881e563120ebfcf92c2cecf463 Mon Sep 17 00:00:00 2001 From: Martin Ashby Date: Thu, 28 Sep 2023 10:38:25 +0100 Subject: More work on conn.zig --- src/conn/conn.zig | 120 +++++++++++++++++++++++++----------------------------- 1 file changed, 55 insertions(+), 65 deletions(-) (limited to 'src/conn/conn.zig') diff --git a/src/conn/conn.zig b/src/conn/conn.zig index 037868e..a5a5ac6 100644 --- a/src/conn/conn.zig +++ b/src/conn/conn.zig @@ -34,6 +34,7 @@ pub fn connect(config: Config) !Conn { .unix => |path| try std.net.connectUnixSocket(path), }; var res = Conn{ + .allocator = allocator, .stream = stream, .config = config, }; @@ -41,6 +42,7 @@ pub fn connect(config: Config) !Conn { var writer = stream.writer(); var dr = diagnosticReader(100, stream.reader()); var reader = dr.reader(); + _ = reader; var params = SSHashMap.init(allocator); try params.put("user", config.user); if (config.database) |database| try params.put("database", database); @@ -50,16 +52,15 @@ pub fn connect(config: Config) !Conn { defer sm.deinit(allocator); try sm.write(allocator, writer); lp: while (true) { - var anymsg = try read_message(allocator, reader); + var anymsg = try res.receiveMessage(); defer anymsg.deinit(allocator); switch (anymsg) { - .ErrorResponse => |err| { - log.err("Error connecting to server {any}", .{err}); - return ServerError.ErrorResponse; + .ReadyForQuery => { + break :lp; }, .AuthenticationRequest => |ar| { switch (ar.inner_type) { - .AuthRequestTypeOk => {}, // fine do nothing! + .AuthRequestTypeOk => {}, .AuthRequestTypeCleartextPassword => { if (config.password) |password| { const pm = PasswordMessage{ .password = password }; @@ -70,60 +71,49 @@ pub fn connect(config: Config) !Conn { }, } }, - .ReadyForQuery => |rfq| { - // TODO do something about transaction state? - res.status = .connStatusIdle; - log.info("ready for query {any}", .{rfq}); - break :lp; - }, - .ParameterStatus => |ps| { - // TODO Handle this somehow? - log.info("ParameterStatus: {s}:{s}", .{ ps.name, ps.value }); - }, - .BackendKeyData => |bkd| { - log.info("BackendKeyData process_id {} secret_key {}", .{ bkd.process_id, bkd.secret_key }); - }, - else => |response_type| { - log.err("unhandled message type [{}]", .{response_type}); - const diag = try dr.get(allocator); - defer allocator.free(diag); - log.err("diag [{s}]", .{diag}); - return ProtocolError.WrongMessageType; + else => { + // deliberately do nothing, we must wait for ready before the connection can be used. }, } } return res; } +// Messages should always be received through this function. +// this'll handle generic stuff that should happen on the connection fn receiveMessage(self: *Conn) !BackendMessage { - var anymsg = try read_message(self., reader); - defer anymsg.deinit(allocator); + var anymsg = try read_message(self.allocator, self.stream.reader()); + errdefer anymsg.deinit(self.allocator); switch (anymsg) { - .ReadyForQuery => - pgConn.txStatus = msg.TxStatus - case *pgproto3.ParameterStatus: - pgConn.parameterStatuses[msg.Name] = msg.Value - case *pgproto3.ErrorResponse: - if msg.Severity == "FATAL" { - pgConn.status = connStatusClosed - pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. - close(pgConn.cleanupDone) - return nil, ErrorResponseToPgError(msg) - } - case *pgproto3.NoticeResponse: - if pgConn.config.OnNotice != nil { - pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg)) - } - case *pgproto3.NotificationResponse: - if pgConn.config.OnNotification != nil { - pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload}) - } + .ReadyForQuery => { + // TODO handle TxStatus + }, + .ParameterStatus => { + // TODO handle parameter status + }, + .ErrorResponse => |err| { + if (std.mem.eql(u8, err.severity, "FATAL")) { + self.status = .connStatusClosed; + // TODO close the connection here? But it should really be the caller's responsiblilty + return ServerError.ErrorResponse; + } + }, + // .NoticeResponse => { + // // TODO handle notice response + // }, + // .NotificationResponse => { + // // TODO handle notificationResponse + // }, + .BackendKeyData => { + // TODO handle backend key data + }, + else => { + // deliberately do nothing, caller can presumably handle them. + }, } - - return msg, nil + return anymsg; } - pub fn deinit(self: *Conn) void { self.stream.close(); } @@ -131,24 +121,24 @@ pub fn deinit(self: *Conn) void { // How to handle this ... // The Go code relies on polymorphism to generically read any message type. // I _could_ have a tagged union type thing -pub const ResultIterator = struct { - conn: *Conn, - command_concluded: bool = false, - // NextRow advances the ResultReader to the next row and returns true if a row is available. - pub fn next_row(self: *ResultIterator) bool { - // TODO implement - var reader = self.conn.stream.reader(); - switch (try reader.readByte()) { - case - } - return false; - } -}; +// pub const ResultIterator = struct { +// conn: *Conn, +// command_concluded: bool = false, +// // NextRow advances the ResultReader to the next row and returns true if a row is available. +// pub fn next_row(self: *ResultIterator) bool { +// // TODO implement +// var reader = self.conn.stream.reader(); +// switch (try reader.readByte()) { +// case +// } +// return false; +// } +// }; -pub const MultiResultIterator = struct { - conn: *Conn, - fn next() ? -}; +// pub const MultiResultIterator = struct { +// conn: *Conn, +// fn next() ? +// }; // pub fn exec(self: *Conn) { -- cgit v1.2.3-ZIG