1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
const std = @import("std");
const log = std.log.scoped(.pgz);
const SSHashMap = std.StringHashMap([]const u8);
const Config = @import("config.zig");
const StartupMessage = @import("startup_message.zig");
const ErrorResponse = @import("error_response.zig");
const AuthenticationRequest = @import("authentication_request.zig");
const ReadyForQuery = @import("ready_for_query.zig");
const ParameterStatus = @import("parameter_status.zig");
const BackendKeyData = @import("backend_key_data.zig");
const read_message = @import("main.zig").read_message;
const ProtocolError = @import("main.zig").ProtocolError;
const ServerError = @import("main.zig").ServerError;
const diagnosticReader = @import("main.zig").diagnosticReader;
const Conn = @This();
const ConnStatus = enum {
connStatusUninitialized,
connStatusConnecting,
connStatusClosed,
connStatusIdle,
connStatusBusy,
};
stream: std.net.Stream,
config: Config,
status: ConnStatus = .connStatusUninitialized,
pub fn connect(config: Config) !Conn {
const allocator = config.allocator;
var stream = switch (config.address) {
.net => |addr| try std.net.tcpConnectToAddress(addr),
.unix => |path| try std.net.connectUnixSocket(path),
};
var res = Conn{
.stream = stream,
.config = config,
};
errdefer res.deinit();
var writer = stream.writer();
var dr = diagnosticReader(10000, stream.reader());
var reader = dr.reader();
var params = SSHashMap.init(allocator);
try params.put("user", config.user);
if (config.database) |database| try params.put("database", database);
var sm = StartupMessage{
.parameters = params,
};
defer sm.deinit(allocator);
try sm.write(allocator, writer);
lp: while (true) {
const response_type = try reader.readByte();
switch (response_type) {
ErrorResponse.Tag => {
var err = try read_message(ErrorResponse, allocator, reader);
defer err.deinit(allocator);
log.err("Error connecting to server {any}", .{err});
return ServerError.ErrorResponse;
},
AuthenticationRequest.Tag => {
var ar = try read_message(AuthenticationRequest, allocator, reader);
defer ar.deinit(allocator);
// TODO handle the authentication request
log.info("authentication request", .{});
},
ReadyForQuery.Tag => {
var rfq = try read_message(ReadyForQuery, allocator, reader);
defer rfq.deinit(allocator);
// TODO do something about transaction state?
res.status = .connStatusIdle;
log.info("ready for query", .{});
break :lp;
},
ParameterStatus.Tag => {
var ps = try read_message(ParameterStatus, allocator, reader);
defer ps.deinit(allocator);
// TODO Handle this somehow?
log.info("ParameterStatus: {s}:{s}", .{ps.name, ps.value});
},
BackendKeyData.Tag =>{
var bkd = try read_message(BackendKeyData, allocator, reader);
defer bkd.deinit(allocator);
log.info("BackendKeyData process_id {} secret_key {}" , .{bkd.process_id, bkd.secret_key});
},
else => {
log.err("unhandled message type [{c}]", .{response_type});
const diag = try dr.get(allocator);
defer allocator.free(diag);
log.err("diag [{s}]", .{diag});
return ProtocolError.WrongMessageType;
},
}
}
return res;
}
fn deinit(self: *Conn) void {
self.stream.close();
}
test "connect" {
// must have a local postgres runnning
// TODO maybe use docker to start one?
const allocator = std.testing.allocator;
const cfg = Config{
.allocator = allocator,
.address = .{.unix = "/run/postgresql/.s.PGSQL.5432"},
.database = "martin",
.user = "martin",
};
var conn = try Conn.connect(cfg);
defer conn.deinit();
}
|