diff options
author | Martin Ashby <martin@ashbysoft.com> | 2023-09-27 19:44:53 +0100 |
---|---|---|
committer | Martin Ashby <martin@ashbysoft.com> | 2023-09-27 19:44:53 +0100 |
commit | 4c5101d5f3a60c58190809166f1aa1eac2e7875f (patch) | |
tree | 3273264492d637b26642e36bdd3e898421390a5d /src/conn | |
parent | 10cbd00671f7144122f1d79a3ac1f67cbb9a51c6 (diff) | |
download | pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.tar.gz pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.tar.bz2 pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.tar.xz pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.zip |
Move conn and config to their own folders analogous to pgx
Diffstat (limited to 'src/conn')
-rw-r--r-- | src/conn/config.zig | 13 | ||||
-rw-r--r-- | src/conn/conn.zig | 111 |
2 files changed, 124 insertions, 0 deletions
diff --git a/src/conn/config.zig b/src/conn/config.zig new file mode 100644 index 0000000..b4e7cff --- /dev/null +++ b/src/conn/config.zig @@ -0,0 +1,13 @@ +const std = @import("std"); +const SSHashMap = std.StringHashMap([]const u8); + +const Config = @This(); + +allocator: std.mem.Allocator, +address: union(enum){ + net: std.net.Address, + unix: []const u8, +}, +database: ?[]const u8 = null, +user: []const u8, +password: ?[]const u8 = null, diff --git a/src/conn/conn.zig b/src/conn/conn.zig new file mode 100644 index 0000000..1b2bf2d --- /dev/null +++ b/src/conn/conn.zig @@ -0,0 +1,111 @@ +const std = @import("std"); +const log = std.log.scoped(.pgz); +const SSHashMap = std.StringHashMap([]const u8); +const Config = @import("config.zig"); +const Proto = @import("../proto/proto.zig"); +const read_message = @import("../main.zig").read_message; +const ProtocolError = @import("../main.zig").ProtocolError; +const ServerError = @import("../main.zig").ServerError; +const diagnosticReader = @import("../main.zig").diagnosticReader; + +const Conn = @This(); + +const ConnStatus = enum { + connStatusUninitialized, + connStatusConnecting, + connStatusClosed, + connStatusIdle, + connStatusBusy, +}; + +stream: std.net.Stream, +config: Config, +status: ConnStatus = .connStatusUninitialized, + +pub fn connect(config: Config) !Conn { + const allocator = config.allocator; + var stream = switch (config.address) { + .net => |addr| try std.net.tcpConnectToAddress(addr), + .unix => |path| try std.net.connectUnixSocket(path), + }; + var res = Conn{ + .stream = stream, + .config = config, + }; + errdefer res.deinit(); + var writer = stream.writer(); + var dr = diagnosticReader(10000, stream.reader()); + var reader = dr.reader(); + var params = SSHashMap.init(allocator); + try params.put("user", config.user); + if (config.database) |database| try params.put("database", database); + var sm = Proto.StartupMessage{ + .parameters = params, + }; + defer sm.deinit(allocator); + try sm.write(allocator, writer); + lp: while (true) { + const response_type = try reader.readByte(); + switch (response_type) { + Proto.ErrorResponse.Tag => { + var err = try read_message(Proto.ErrorResponse, allocator, reader); + defer err.deinit(allocator); + log.err("Error connecting to server {any}", .{err}); + return ServerError.ErrorResponse; + }, + Proto.AuthenticationRequest.Tag => { + var ar = try read_message(Proto.AuthenticationRequest, allocator, reader); + defer ar.deinit(allocator); + // TODO handle the authentication request + log.info("authentication request", .{}); + }, + Proto.ReadyForQuery.Tag => { + var rfq = try read_message(Proto.ReadyForQuery, allocator, reader); + defer rfq.deinit(allocator); + // TODO do something about transaction state? + res.status = .connStatusIdle; + log.info("ready for query", .{}); + break :lp; + }, + Proto.ParameterStatus.Tag => { + var ps = try read_message(Proto.ParameterStatus, allocator, reader); + defer ps.deinit(allocator); + // TODO Handle this somehow? + log.info("ParameterStatus: {s}:{s}", .{ ps.name, ps.value }); + }, + Proto.BackendKeyData.Tag => { + var bkd = try read_message(Proto.BackendKeyData, allocator, reader); + defer bkd.deinit(allocator); + log.info("BackendKeyData process_id {} secret_key {}", .{ bkd.process_id, bkd.secret_key }); + }, + else => { + log.err("unhandled message type [{c}]", .{response_type}); + const diag = try dr.get(allocator); + defer allocator.free(diag); + log.err("diag [{s}]", .{diag}); + return ProtocolError.WrongMessageType; + }, + } + } + return res; +} + +fn deinit(self: *Conn) void { + self.stream.close(); +} + +//pub fn exec(self: *Conn) + +test "connect" { + // must have a local postgres runnning + // TODO maybe use docker to start one? + const allocator = std.testing.allocator; + const cfg = Config{ + .allocator = allocator, + .address = .{ .unix = "/run/postgresql/.s.PGSQL.5432" }, + .database = "martin", + .user = "martin", + }; + var conn = try Conn.connect(cfg); + defer conn.deinit(); +} |