aboutsummaryrefslogtreecommitdiff
path: root/src/conn
diff options
context:
space:
mode:
authorMartin Ashby <martin@ashbysoft.com>2023-09-27 19:44:53 +0100
committerMartin Ashby <martin@ashbysoft.com>2023-09-27 19:44:53 +0100
commit4c5101d5f3a60c58190809166f1aa1eac2e7875f (patch)
tree3273264492d637b26642e36bdd3e898421390a5d /src/conn
parent10cbd00671f7144122f1d79a3ac1f67cbb9a51c6 (diff)
downloadpgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.tar.gz
pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.tar.bz2
pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.tar.xz
pgz-4c5101d5f3a60c58190809166f1aa1eac2e7875f.zip
Move conn and config to their own folders analogous to pgx
Diffstat (limited to 'src/conn')
-rw-r--r--src/conn/config.zig13
-rw-r--r--src/conn/conn.zig111
2 files changed, 124 insertions, 0 deletions
diff --git a/src/conn/config.zig b/src/conn/config.zig
new file mode 100644
index 0000000..b4e7cff
--- /dev/null
+++ b/src/conn/config.zig
@@ -0,0 +1,13 @@
+const std = @import("std");
+const SSHashMap = std.StringHashMap([]const u8);
+
+const Config = @This();
+
+allocator: std.mem.Allocator,
+address: union(enum){
+ net: std.net.Address,
+ unix: []const u8,
+},
+database: ?[]const u8 = null,
+user: []const u8,
+password: ?[]const u8 = null,
diff --git a/src/conn/conn.zig b/src/conn/conn.zig
new file mode 100644
index 0000000..1b2bf2d
--- /dev/null
+++ b/src/conn/conn.zig
@@ -0,0 +1,111 @@
+const std = @import("std");
+const log = std.log.scoped(.pgz);
+const SSHashMap = std.StringHashMap([]const u8);
+const Config = @import("config.zig");
+const Proto = @import("../proto/proto.zig");
+const read_message = @import("../main.zig").read_message;
+const ProtocolError = @import("../main.zig").ProtocolError;
+const ServerError = @import("../main.zig").ServerError;
+const diagnosticReader = @import("../main.zig").diagnosticReader;
+
+const Conn = @This();
+
+const ConnStatus = enum {
+ connStatusUninitialized,
+ connStatusConnecting,
+ connStatusClosed,
+ connStatusIdle,
+ connStatusBusy,
+};
+
+stream: std.net.Stream,
+config: Config,
+status: ConnStatus = .connStatusUninitialized,
+
+pub fn connect(config: Config) !Conn {
+ const allocator = config.allocator;
+ var stream = switch (config.address) {
+ .net => |addr| try std.net.tcpConnectToAddress(addr),
+ .unix => |path| try std.net.connectUnixSocket(path),
+ };
+ var res = Conn{
+ .stream = stream,
+ .config = config,
+ };
+ errdefer res.deinit();
+ var writer = stream.writer();
+ var dr = diagnosticReader(10000, stream.reader());
+ var reader = dr.reader();
+ var params = SSHashMap.init(allocator);
+ try params.put("user", config.user);
+ if (config.database) |database| try params.put("database", database);
+ var sm = Proto.StartupMessage{
+ .parameters = params,
+ };
+ defer sm.deinit(allocator);
+ try sm.write(allocator, writer);
+ lp: while (true) {
+ const response_type = try reader.readByte();
+ switch (response_type) {
+ Proto.ErrorResponse.Tag => {
+ var err = try read_message(Proto.ErrorResponse, allocator, reader);
+ defer err.deinit(allocator);
+ log.err("Error connecting to server {any}", .{err});
+ return ServerError.ErrorResponse;
+ },
+ Proto.AuthenticationRequest.Tag => {
+ var ar = try read_message(Proto.AuthenticationRequest, allocator, reader);
+ defer ar.deinit(allocator);
+ // TODO handle the authentication request
+ log.info("authentication request", .{});
+ },
+ Proto.ReadyForQuery.Tag => {
+ var rfq = try read_message(Proto.ReadyForQuery, allocator, reader);
+ defer rfq.deinit(allocator);
+ // TODO do something about transaction state?
+ res.status = .connStatusIdle;
+ log.info("ready for query", .{});
+ break :lp;
+ },
+ Proto.ParameterStatus.Tag => {
+ var ps = try read_message(Proto.ParameterStatus, allocator, reader);
+ defer ps.deinit(allocator);
+ // TODO Handle this somehow?
+ log.info("ParameterStatus: {s}:{s}", .{ ps.name, ps.value });
+ },
+ Proto.BackendKeyData.Tag => {
+ var bkd = try read_message(Proto.BackendKeyData, allocator, reader);
+ defer bkd.deinit(allocator);
+ log.info("BackendKeyData process_id {} secret_key {}", .{ bkd.process_id, bkd.secret_key });
+ },
+ else => {
+ log.err("unhandled message type [{c}]", .{response_type});
+ const diag = try dr.get(allocator);
+ defer allocator.free(diag);
+ log.err("diag [{s}]", .{diag});
+ return ProtocolError.WrongMessageType;
+ },
+ }
+ }
+ return res;
+}
+
+fn deinit(self: *Conn) void {
+ self.stream.close();
+}
+
+//pub fn exec(self: *Conn)
+
+test "connect" {
+ // must have a local postgres runnning
+ // TODO maybe use docker to start one?
+ const allocator = std.testing.allocator;
+ const cfg = Config{
+ .allocator = allocator,
+ .address = .{ .unix = "/run/postgresql/.s.PGSQL.5432" },
+ .database = "martin",
+ .user = "martin",
+ };
+ var conn = try Conn.connect(cfg);
+ defer conn.deinit();
+}