const std = @import("std"); const log = std.log.scoped(.pgz); const SSHashMap = std.StringHashMap([]const u8); const Config = @import("config.zig"); const StartupMessage = @import("startup_message.zig"); const ErrorResponse = @import("error_response.zig"); const AuthenticationRequest = @import("authentication_request.zig"); const ReadyForQuery = @import("ready_for_query.zig"); const ParameterStatus = @import("parameter_status.zig"); const BackendKeyData = @import("backend_key_data.zig"); const read_message = @import("main.zig").read_message; const ProtocolError = @import("main.zig").ProtocolError; const ServerError = @import("main.zig").ServerError; const diagnosticReader = @import("main.zig").diagnosticReader; const Conn = @This(); const ConnStatus = enum { connStatusUninitialized, connStatusConnecting, connStatusClosed, connStatusIdle, connStatusBusy, }; stream: std.net.Stream, config: Config, status: ConnStatus = .connStatusUninitialized, pub fn connect(config: Config) !Conn { const allocator = config.allocator; var stream = switch (config.address) { .net => |addr| try std.net.tcpConnectToAddress(addr), .unix => |path| try std.net.connectUnixSocket(path), }; var res = Conn{ .stream = stream, .config = config, }; errdefer res.deinit(); var writer = stream.writer(); var dr = diagnosticReader(10000, stream.reader()); var reader = dr.reader(); var params = SSHashMap.init(allocator); try params.put("user", config.user); if (config.database) |database| try params.put("database", database); var sm = StartupMessage{ .parameters = params, }; defer sm.deinit(allocator); try sm.write(allocator, writer); lp: while (true) { const response_type = try reader.readByte(); switch (response_type) { ErrorResponse.Tag => { var err = try read_message(ErrorResponse, allocator, reader); defer err.deinit(allocator); log.err("Error connecting to server {any}", .{err}); return ServerError.ErrorResponse; }, AuthenticationRequest.Tag => { var ar = try read_message(AuthenticationRequest, allocator, reader); defer ar.deinit(allocator); // TODO handle the authentication request log.info("authentication request", .{}); }, ReadyForQuery.Tag => { var rfq = try read_message(ReadyForQuery, allocator, reader); defer rfq.deinit(allocator); // TODO do something about transaction state? res.status = .connStatusIdle; log.info("ready for query", .{}); break :lp; }, ParameterStatus.Tag => { var ps = try read_message(ParameterStatus, allocator, reader); defer ps.deinit(allocator); // TODO Handle this somehow? log.info("ParameterStatus: {s}:{s}", .{ps.name, ps.value}); }, BackendKeyData.Tag =>{ var bkd = try read_message(BackendKeyData, allocator, reader); defer bkd.deinit(allocator); log.info("BackendKeyData process_id {} secret_key {}" , .{bkd.process_id, bkd.secret_key}); }, else => { log.err("unhandled message type [{c}]", .{response_type}); const diag = try dr.get(allocator); defer allocator.free(diag); log.err("diag [{s}]", .{diag}); return ProtocolError.WrongMessageType; }, } } return res; } fn deinit(self: *Conn) void { self.stream.close(); } test "connect" { // must have a local postgres runnning // TODO maybe use docker to start one? const allocator = std.testing.allocator; const cfg = Config{ .allocator = allocator, .address = .{.unix = "/run/postgresql/.s.PGSQL.5432"}, .database = "martin", .user = "martin", }; var conn = try Conn.connect(cfg); defer conn.deinit(); }