seekable_http_range.zig (7540B)
1 //! Reader/SeekableStream implementation of a resource over HTTP using Range requests. 2 3 const std = @import("std"); 4 5 const SeekableHttpRange = @This(); 6 7 pub const Opts = struct { 8 allocator: std.mem.Allocator, 9 client: *std.http.Client, 10 url: []const u8, 11 buffer_size: usize = 1024, 12 }; 13 14 allocator: std.mem.Allocator, 15 client: *std.http.Client, 16 url: []const u8, 17 pos: u64 = 0, 18 endpos: u64, 19 buffer_pos: ?u64 = null, 20 buffer: []u8, 21 22 pub fn init(opts: Opts) !SeekableHttpRange { 23 var a = opts.allocator; 24 var client = opts.client; 25 var url = opts.url; 26 var res = try client.fetch(a, .{ 27 .method = .HEAD, 28 .location = .{ .url = url }, 29 .headers = std.http.Headers{ .allocator = a, .owned = false }, 30 }); 31 defer res.deinit(); 32 if (res.status != .ok) return error.HttpStatusError; 33 const accept_header_val = res.headers.getFirstValue("accept-ranges") orelse return error.HttpRangeNotSpecified; 34 if (std.mem.eql(u8, accept_header_val, "none")) return error.HttpRangeNotSupported; 35 if (!std.mem.eql(u8, accept_header_val, "bytes")) return error.HttpRangeUnsupportedUnit; 36 const content_length_val = res.headers.getFirstValue("content-length") orelse return error.NoContentLength; 37 const content_length = std.fmt.parseInt(u64, content_length_val, 10) catch return error.ContentLengthFormatError; 38 var buffer = try a.alloc(u8, opts.buffer_size); 39 return .{ 40 .allocator = opts.allocator, 41 .client = client, 42 .url = url, 43 .buffer = buffer, 44 .endpos = content_length, 45 }; 46 } 47 48 pub fn deinit(self: *SeekableHttpRange) void { 49 self.allocator.free(self.buffer); 50 } 51 52 // it's a big list :( 53 pub const ReadError = error{ 54 UnsupportedHeader, 55 UnexpectedCharacter, 56 InvalidFormat, 57 InvalidPort, 58 OutOfMemory, 59 ConnectionRefused, 60 NetworkUnreachable, 61 ConnectionTimedOut, 62 ConnectionResetByPeer, 63 TemporaryNameServerFailure, 64 NameServerFailure, 65 UnknownHostName, 66 HostLacksNetworkAddresses, 67 UnexpectedConnectFailure, 68 TlsInitializationFailed, 69 UnsupportedUrlScheme, 70 UnexpectedWriteFailure, 71 InvalidContentLength, 72 UnsupportedTransferEncoding, 73 Overflow, 74 InvalidCharacter, 75 UriMissingHost, 76 CertificateBundleLoadFailure, 77 TlsFailure, 78 TlsAlert, 79 UnexpectedReadFailure, 80 EndOfStream, 81 HttpChunkInvalid, 82 SystemResources, 83 FileLocksNotSupported, 84 Unexpected, 85 AccessDenied, 86 NotWriteable, 87 MessageTooLong, 88 Unseekable, 89 InputOutput, 90 IsDir, 91 OperationAborted, 92 BrokenPipe, 93 NotOpenForReading, 94 NetNameDeleted, 95 WouldBlock, 96 MessageNotCompleted, 97 HttpHeadersExceededSizeLimit, 98 HttpHeadersInvalid, 99 HttpHeaderContinuationsUnsupported, 100 HttpTransferEncodingUnsupported, 101 HttpConnectionHeaderUnsupported, 102 CompressionNotSupported, 103 TooManyHttpRedirects, 104 RedirectRequiresResend, 105 HttpRedirectMissingLocation, 106 CompressionInitializationFailed, 107 DecompressionFailure, 108 InvalidTrailers, 109 StreamTooLong, 110 DiskQuota, 111 FileTooBig, 112 NoSpaceLeft, 113 DeviceBusy, 114 InvalidArgument, 115 NotOpenForWriting, 116 LockViolation, 117 HttpStatusError, 118 HttpNoBody, 119 }; 120 121 pub const Reader = std.io.Reader(*SeekableHttpRange, ReadError, read); 122 123 pub fn reader(self: *SeekableHttpRange) Reader { 124 return .{ 125 .context = self, 126 }; 127 } 128 129 pub fn read(self: *SeekableHttpRange, buffer: []u8) ReadError!usize { 130 var n: usize = 0; 131 for (0..buffer.len) |ix| { 132 const b = try readByte(self); 133 const bb = b orelse break; 134 buffer[ix] = bb; 135 n += 1; 136 } 137 return n; 138 } 139 140 fn readByte(self: *SeekableHttpRange) !?u8 { 141 if (self.pos >= self.endpos) return null; 142 143 if (self.buffer_pos) |buffer_pos| { 144 const buffer_end: u64 = buffer_pos + self.buffer.len; 145 if (self.pos >= buffer_pos and self.pos < buffer_end) { 146 return self.readFromBuffer(); 147 } 148 } 149 150 // refill the buffer from pos 151 // max u64 formatted as decimal is 20 bytes long 152 const range_buf_len = "bytes=-".len + 20 + 20; 153 var range_buf = [_]u8{0} ** range_buf_len; 154 const nbuf_end = @min(self.pos + self.buffer.len, self.endpos); 155 // Range request end is _inclusive_ 156 var range_value = std.fmt.bufPrint(&range_buf, "bytes={}-{}", .{ self.pos, nbuf_end - 1 }) catch unreachable; 157 var headers = std.http.Headers{ .allocator = self.allocator }; 158 defer headers.deinit(); 159 try headers.append("range", range_value); 160 var res = try self.client.fetch(self.allocator, .{ 161 .location = .{ .url = self.url }, 162 .headers = headers, 163 }); 164 defer res.deinit(); 165 if (res.status != .partial_content) return error.HttpStatusError; 166 const body = res.body orelse return error.HttpNoBody; 167 std.mem.copyForwards(u8, self.buffer, body); 168 self.buffer_pos = self.pos; 169 return self.readFromBuffer(); 170 } 171 172 fn readFromBuffer(self: *SeekableHttpRange) u8 { 173 const pos_in_buffer = self.pos - self.buffer_pos.?; 174 defer self.pos += 1; 175 return self.buffer[pos_in_buffer]; 176 } 177 178 pub const SeekError = error{}; 179 pub const GetSeekPosError = error{}; 180 pub const SeekableStream = std.io.SeekableStream(*SeekableHttpRange, SeekError, GetSeekPosError, seekTo, seekBy, getPos, getEndPos); 181 182 pub fn seekableStream(self: *SeekableHttpRange) SeekableStream { 183 return .{ 184 .context = self, 185 }; 186 } 187 188 // copying from FixedBufferStream: clamp rather than return an error 189 pub fn seekTo(self: *SeekableHttpRange, pos: u64) SeekError!void { 190 self.pos = std.math.clamp(pos, 0, self.endpos); 191 } 192 193 // copying from FixedBufferStream: clamp rather than return an error 194 pub fn seekBy(self: *SeekableHttpRange, delta: i64) SeekError!void { 195 const np: u64 = if (std.math.sign(delta) == -1) 196 std.math.sub(u64, self.pos, std.math.absCast(delta)) catch 0 197 else 198 std.math.add(u64, self.pos, std.math.absCast(delta)) catch std.math.maxInt(u64); 199 self.pos = std.math.clamp(np, 0, self.endpos); 200 } 201 202 pub fn getPos(self: *SeekableHttpRange) GetSeekPosError!u64 { 203 return self.pos; 204 } 205 206 pub fn getEndPos(self: *SeekableHttpRange) GetSeekPosError!u64 { 207 return self.endpos; 208 } 209 210 test "endBytes" { 211 const a = std.testing.allocator; 212 var client = std.http.Client{ .allocator = a }; 213 defer client.deinit(); 214 var range = try SeekableHttpRange.init(.{ .allocator = a, .client = &client, .url = "https://mfashby.net/posts.zip" }); 215 defer range.deinit(); 216 var ss = range.seekableStream(); 217 var rr = range.reader(); 218 219 var buf = try a.alloc(u8, 20); 220 defer a.free(buf); 221 222 try ss.seekTo(try ss.getEndPos() - 20); 223 try rr.readNoEof(buf); 224 try std.testing.expectEqualSlices(u8, &[_]u8{ 0x05, 0x06, 0x00, 0x00, 0x00, 0x00, 0x31, 0x00, 0x31, 0x00, 0xFD, 0x11, 0x00, 0x00, 0xEE, 0xB7, 0x00, 0x00, 0x00, 0x00 }, buf); 225 226 try ss.seekBy(-300); 227 try rr.readNoEof(buf); 228 try std.testing.expectEqualSlices(u8, &[_]u8{ 0x00, 0x00, 0x00, 0x08, 0x00, 0xA1, 0x3A, 0x17, 0x57, 0x85, 0x9F, 0x53, 0xCE, 0x26, 0x05, 0x00, 0x00, 0x37, 0x0A, 0x00 }, buf); 229 230 try ss.seekTo(0); 231 try ss.seekBy(-1); 232 try std.testing.expectEqual(@as(u64, 0), try ss.getPos()); 233 234 try ss.seekBy(std.math.minInt(i64)); 235 try std.testing.expectEqual(@as(u64, 0), try ss.getPos()); 236 237 try ss.seekTo(try ss.getEndPos()); 238 try ss.seekBy(1); 239 try std.testing.expectEqual(try ss.getEndPos(), try ss.getPos()); 240 241 try ss.seekBy(std.math.maxInt(i64)); 242 try std.testing.expectEqual(try ss.getEndPos(), try ss.getPos()); 243 }