aboutsummaryrefslogtreecommitdiff
path: root/src/conn.zig
diff options
context:
space:
mode:
authorMartin Ashby <martin@ashbysoft.com>2023-09-23 21:26:06 +0100
committerMartin Ashby <martin@ashbysoft.com>2023-09-23 21:26:06 +0100
commitddc6bee3757d3e68a14fafdc47eb5d0a0ba923bb (patch)
tree309ffbc89c42a3adf41a655025fe20e4014878c9 /src/conn.zig
parent5a91b37ee7dd36db52dfde1727b780ec3fa4c67d (diff)
downloadpgz-ddc6bee3757d3e68a14fafdc47eb5d0a0ba923bb.tar.gz
pgz-ddc6bee3757d3e68a14fafdc47eb5d0a0ba923bb.tar.bz2
pgz-ddc6bee3757d3e68a14fafdc47eb5d0a0ba923bb.tar.xz
pgz-ddc6bee3757d3e68a14fafdc47eb5d0a0ba923bb.zip
Got a working test connecting to postgres server and reading data.
Added a couple more message types to facilitate this.
Diffstat (limited to 'src/conn.zig')
-rw-r--r--src/conn.zig94
1 files changed, 80 insertions, 14 deletions
diff --git a/src/conn.zig b/src/conn.zig
index 870ab01..36c89df 100644
--- a/src/conn.zig
+++ b/src/conn.zig
@@ -1,10 +1,17 @@
const std = @import("std");
+const log = std.log.scoped(.pgz);
const SSHashMap = std.StringHashMap([]const u8);
const Config = @import("config.zig");
const StartupMessage = @import("startup_message.zig");
-const AuthenticationOk = @import("authentication_ok.zig");
-const AuthenticationCleartextPassword = @import("authentication_cleartext_password.zig");
const ErrorResponse = @import("error_response.zig");
+const AuthenticationRequest = @import("authentication_request.zig");
+const ReadyForQuery = @import("ready_for_query.zig");
+const ParameterStatus = @import("parameter_status.zig");
+const BackendKeyData = @import("backend_key_data.zig");
+const read_message = @import("main.zig").read_message;
+const ProtocolError = @import("main.zig").ProtocolError;
+const ServerError = @import("main.zig").ServerError;
+const diagnosticReader = @import("main.zig").diagnosticReader;
const Conn = @This();
@@ -18,7 +25,7 @@ const ConnStatus = enum {
stream: std.net.Stream,
config: Config,
-status: ConnStatus,
+status: ConnStatus = .connStatusUninitialized,
pub fn connect(config: Config) !Conn {
const allocator = config.allocator;
@@ -26,24 +33,83 @@ pub fn connect(config: Config) !Conn {
.net => |addr| try std.net.tcpConnectToAddress(addr),
.unix => |path| try std.net.connectUnixSocket(path),
};
+ var res = Conn{
+ .stream = stream,
+ .config = config,
+ };
+ errdefer res.deinit();
var writer = stream.writer();
-
- errdefer stream.close();
+ var dr = diagnosticReader(10000, stream.reader());
+ var reader = dr.reader();
var params = SSHashMap.init(allocator);
- errdefer params.deinit();
try params.put("user", config.user);
- if (config.database) |database| try params.put(database);
+ if (config.database) |database| try params.put("database", database);
var sm = StartupMessage{
.parameters = params,
};
defer sm.deinit(allocator);
try sm.write(allocator, writer);
+ lp: while (true) {
+ const response_type = try reader.readByte();
+ switch (response_type) {
+ ErrorResponse.Tag => {
+ var err = try read_message(ErrorResponse, allocator, reader);
+ defer err.deinit(allocator);
+ log.err("Error connecting to server {any}", .{err});
+ return ServerError.ErrorResponse;
+ },
+ AuthenticationRequest.Tag => {
+ var ar = try read_message(AuthenticationRequest, allocator, reader);
+ defer ar.deinit(allocator);
+ // TODO handle the authentication request
+ log.info("authentication request", .{});
+ },
+ ReadyForQuery.Tag => {
+ var rfq = try read_message(ReadyForQuery, allocator, reader);
+ defer rfq.deinit(allocator);
+ // TODO do something about transaction state?
+ res.status = .connStatusIdle;
+ log.info("ready for query", .{});
+ break :lp;
+ },
+ ParameterStatus.Tag => {
+ var ps = try read_message(ParameterStatus, allocator, reader);
+ defer ps.deinit(allocator);
+ // TODO Handle this somehow?
+ log.info("ParameterStatus: {s}:{s}", .{ps.name, ps.value});
+ },
+ BackendKeyData.Tag =>{
+ var bkd = try read_message(BackendKeyData, allocator, reader);
+ defer bkd.deinit(allocator);
+ log.info("BackendKeyData process_id {} secret_key {}" , .{bkd.process_id, bkd.secret_key});
+ },
+ else => {
+ log.err("unhandled message type [{c}]", .{response_type});
+ const diag = try dr.get(allocator);
+ defer allocator.free(diag);
+ log.err("diag [{s}]", .{diag});
+ return ProtocolError.WrongMessageType;
+ },
+ }
+ }
+ return res;
+}
+
+fn deinit(self: *Conn) void {
+ self.stream.close();
+}
+
+test "connect" {
+ // must have a local postgres runnning
+ // TODO maybe use docker to start one?
+ const allocator = std.testing.allocator;
+ const cfg = Config{
+ .allocator = allocator,
+ .address = .{.unix = "/run/postgresql/.s.PGSQL.5432"},
+ .database = "martin",
+ .user = "martin",
+ };
+ var conn = try Conn.connect(cfg);
+ defer conn.deinit();
}
-const StartupMessageResponseType = enum(u8) {
- ErrorResponse = 'E',
- AuthenticationResponse = AuthenticationOk.Tag, // All the authentication responses share a message type and must be decoded by the next field
-};
-const StartupMessageResponse = union(StartupMessageResponseType) {
- error: ErrorResponse,
-}; \ No newline at end of file