From 4c5101d5f3a60c58190809166f1aa1eac2e7875f Mon Sep 17 00:00:00 2001 From: Martin Ashby Date: Wed, 27 Sep 2023 19:44:53 +0100 Subject: Move conn and config to their own folders analogous to pgx --- src/config.zig | 13 ------ src/conn.zig | 111 ---------------------------------------------------- src/conn/config.zig | 13 ++++++ src/conn/conn.zig | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.zig | 4 +- 5 files changed, 126 insertions(+), 126 deletions(-) delete mode 100644 src/config.zig delete mode 100644 src/conn.zig create mode 100644 src/conn/config.zig create mode 100644 src/conn/conn.zig diff --git a/src/config.zig b/src/config.zig deleted file mode 100644 index b4e7cff..0000000 --- a/src/config.zig +++ /dev/null @@ -1,13 +0,0 @@ -const std = @import("std"); -const SSHashMap = std.StringHashMap([]const u8); - -const Config = @This(); - -allocator: std.mem.Allocator, -address: union(enum){ - net: std.net.Address, - unix: []const u8, -}, -database: ?[]const u8 = null, -user: []const u8, -password: ?[]const u8 = null, diff --git a/src/conn.zig b/src/conn.zig deleted file mode 100644 index f9f4fb5..0000000 --- a/src/conn.zig +++ /dev/null @@ -1,111 +0,0 @@ -const std = @import("std"); -const log = std.log.scoped(.pgz); -const SSHashMap = std.StringHashMap([]const u8); -const Config = @import("config.zig"); -const Proto = @import("proto/proto.zig"); -const read_message = @import("main.zig").read_message; -const ProtocolError = @import("main.zig").ProtocolError; -const ServerError = @import("main.zig").ServerError; -const diagnosticReader = @import("main.zig").diagnosticReader; - -const Conn = @This(); - -const ConnStatus = enum { - connStatusUninitialized, - connStatusConnecting, - connStatusClosed, - connStatusIdle, - connStatusBusy, -}; - -stream: std.net.Stream, -config: Config, -status: ConnStatus = .connStatusUninitialized, - -pub fn connect(config: Config) !Conn { - const allocator = config.allocator; - var stream = switch (config.address) { - .net => |addr| try std.net.tcpConnectToAddress(addr), - .unix => |path| try std.net.connectUnixSocket(path), - }; - var res = Conn{ - .stream = stream, - .config = config, - }; - errdefer res.deinit(); - var writer = stream.writer(); - var dr = diagnosticReader(10000, stream.reader()); - var reader = dr.reader(); - var params = SSHashMap.init(allocator); - try params.put("user", config.user); - if (config.database) |database| try params.put("database", database); - var sm = Proto.StartupMessage{ - .parameters = params, - }; - defer sm.deinit(allocator); - try sm.write(allocator, writer); - lp: while (true) { - const response_type = try reader.readByte(); - switch (response_type) { - Proto.ErrorResponse.Tag => { - var err = try read_message(Proto.ErrorResponse, allocator, reader); - defer err.deinit(allocator); - log.err("Error connecting to server {any}", .{err}); - return ServerError.ErrorResponse; - }, - Proto.AuthenticationRequest.Tag => { - var ar = try read_message(Proto.AuthenticationRequest, allocator, reader); - defer ar.deinit(allocator); - // TODO handle the authentication request - log.info("authentication request", .{}); - }, - Proto.ReadyForQuery.Tag => { - var rfq = try read_message(Proto.ReadyForQuery, allocator, reader); - defer rfq.deinit(allocator); - // TODO do something about transaction state? - res.status = .connStatusIdle; - log.info("ready for query", .{}); - break :lp; - }, - Proto.ParameterStatus.Tag => { - var ps = try read_message(Proto.ParameterStatus, allocator, reader); - defer ps.deinit(allocator); - // TODO Handle this somehow? - log.info("ParameterStatus: {s}:{s}", .{ ps.name, ps.value }); - }, - Proto.BackendKeyData.Tag => { - var bkd = try read_message(Proto.BackendKeyData, allocator, reader); - defer bkd.deinit(allocator); - log.info("BackendKeyData process_id {} secret_key {}", .{ bkd.process_id, bkd.secret_key }); - }, - else => { - log.err("unhandled message type [{c}]", .{response_type}); - const diag = try dr.get(allocator); - defer allocator.free(diag); - log.err("diag [{s}]", .{diag}); - return ProtocolError.WrongMessageType; - }, - } - } - return res; -} - -fn deinit(self: *Conn) void { - self.stream.close(); -} - -//pub fn exec(self: *Conn) - -test "connect" { - // must have a local postgres runnning - // TODO maybe use docker to start one? - const allocator = std.testing.allocator; - const cfg = Config{ - .allocator = allocator, - .address = .{ .unix = "/run/postgresql/.s.PGSQL.5432" }, - .database = "martin", - .user = "martin", - }; - var conn = try Conn.connect(cfg); - defer conn.deinit(); -} diff --git a/src/conn/config.zig b/src/conn/config.zig new file mode 100644 index 0000000..b4e7cff --- /dev/null +++ b/src/conn/config.zig @@ -0,0 +1,13 @@ +const std = @import("std"); +const SSHashMap = std.StringHashMap([]const u8); + +const Config = @This(); + +allocator: std.mem.Allocator, +address: union(enum){ + net: std.net.Address, + unix: []const u8, +}, +database: ?[]const u8 = null, +user: []const u8, +password: ?[]const u8 = null, diff --git a/src/conn/conn.zig b/src/conn/conn.zig new file mode 100644 index 0000000..1b2bf2d --- /dev/null +++ b/src/conn/conn.zig @@ -0,0 +1,111 @@ +const std = @import("std"); +const log = std.log.scoped(.pgz); +const SSHashMap = std.StringHashMap([]const u8); +const Config = @import("config.zig"); +const Proto = @import("../proto/proto.zig"); +const read_message = @import("../main.zig").read_message; +const ProtocolError = @import("../main.zig").ProtocolError; +const ServerError = @import("../main.zig").ServerError; +const diagnosticReader = @import("../main.zig").diagnosticReader; + +const Conn = @This(); + +const ConnStatus = enum { + connStatusUninitialized, + connStatusConnecting, + connStatusClosed, + connStatusIdle, + connStatusBusy, +}; + +stream: std.net.Stream, +config: Config, +status: ConnStatus = .connStatusUninitialized, + +pub fn connect(config: Config) !Conn { + const allocator = config.allocator; + var stream = switch (config.address) { + .net => |addr| try std.net.tcpConnectToAddress(addr), + .unix => |path| try std.net.connectUnixSocket(path), + }; + var res = Conn{ + .stream = stream, + .config = config, + }; + errdefer res.deinit(); + var writer = stream.writer(); + var dr = diagnosticReader(10000, stream.reader()); + var reader = dr.reader(); + var params = SSHashMap.init(allocator); + try params.put("user", config.user); + if (config.database) |database| try params.put("database", database); + var sm = Proto.StartupMessage{ + .parameters = params, + }; + defer sm.deinit(allocator); + try sm.write(allocator, writer); + lp: while (true) { + const response_type = try reader.readByte(); + switch (response_type) { + Proto.ErrorResponse.Tag => { + var err = try read_message(Proto.ErrorResponse, allocator, reader); + defer err.deinit(allocator); + log.err("Error connecting to server {any}", .{err}); + return ServerError.ErrorResponse; + }, + Proto.AuthenticationRequest.Tag => { + var ar = try read_message(Proto.AuthenticationRequest, allocator, reader); + defer ar.deinit(allocator); + // TODO handle the authentication request + log.info("authentication request", .{}); + }, + Proto.ReadyForQuery.Tag => { + var rfq = try read_message(Proto.ReadyForQuery, allocator, reader); + defer rfq.deinit(allocator); + // TODO do something about transaction state? + res.status = .connStatusIdle; + log.info("ready for query", .{}); + break :lp; + }, + Proto.ParameterStatus.Tag => { + var ps = try read_message(Proto.ParameterStatus, allocator, reader); + defer ps.deinit(allocator); + // TODO Handle this somehow? + log.info("ParameterStatus: {s}:{s}", .{ ps.name, ps.value }); + }, + Proto.BackendKeyData.Tag => { + var bkd = try read_message(Proto.BackendKeyData, allocator, reader); + defer bkd.deinit(allocator); + log.info("BackendKeyData process_id {} secret_key {}", .{ bkd.process_id, bkd.secret_key }); + }, + else => { + log.err("unhandled message type [{c}]", .{response_type}); + const diag = try dr.get(allocator); + defer allocator.free(diag); + log.err("diag [{s}]", .{diag}); + return ProtocolError.WrongMessageType; + }, + } + } + return res; +} + +fn deinit(self: *Conn) void { + self.stream.close(); +} + +//pub fn exec(self: *Conn) + +test "connect" { + // must have a local postgres runnning + // TODO maybe use docker to start one? + const allocator = std.testing.allocator; + const cfg = Config{ + .allocator = allocator, + .address = .{ .unix = "/run/postgresql/.s.PGSQL.5432" }, + .database = "martin", + .user = "martin", + }; + var conn = try Conn.connect(cfg); + defer conn.deinit(); +} diff --git a/src/main.zig b/src/main.zig index f5941b7..c8a9d8e 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,7 +1,5 @@ const std = @import("std"); const testing = std.testing; -const Conn = @import("conn.zig"); -const Proto = @import("proto/proto.zig"); pub const ProtocolError = error{ InvalidProtocolVersion, @@ -106,6 +104,8 @@ test "diagnostc reader" { } test { + const Conn = @import("conn/conn.zig"); + const Proto = @import("proto/proto.zig"); _ = Proto; _ = Conn; } -- cgit v1.2.3-ZIG