comparison event_log.c @ 1958:9c01945b5d20

Use zlib to compress event log streams
author Mike Pavone <pavone@retrodev.com>
date Sat, 02 May 2020 17:33:23 -0700
parents ba06346611a1
children bd70f1e15684
comparison
equal deleted inserted replaced
1957:ba06346611a1 1958:9c01945b5d20
13 #include <errno.h> 13 #include <errno.h>
14 #include "event_log.h" 14 #include "event_log.h"
15 #include "util.h" 15 #include "util.h"
16 #include "blastem.h" 16 #include "blastem.h"
17 #include "saves.h" 17 #include "saves.h"
18 #include "zlib/zlib.h"
18 19
19 enum { 20 enum {
20 CMD_GAMEPAD_DOWN, 21 CMD_GAMEPAD_DOWN,
21 CMD_GAMEPAD_UP, 22 CMD_GAMEPAD_UP,
22 }; 23 };
23 24
24 static uint8_t active, fully_active; 25 static uint8_t active, fully_active;
25 static FILE *event_file; 26 static FILE *event_file;
26 static serialize_buffer buffer; 27 static serialize_buffer buffer;
28 static uint8_t *compressed;
29 static size_t compressed_storage;
30 static z_stream output_stream;
31 static uint32_t last;
32
33 static void event_log_common_init(void)
34 {
35 init_serialize(&buffer);
36 compressed_storage = 128*1024;
37 compressed = malloc(compressed_storage);
38 deflateInit(&output_stream, 9);
39 output_stream.avail_out = compressed_storage;
40 output_stream.next_out = compressed;
41 output_stream.avail_in = 0;
42 output_stream.next_in = buffer.data;
43 last = 0;
44 active = 1;
45 }
46
47 static uint8_t multi_count;
48 static size_t multi_start;
49 static void finish_multi(void)
50 {
51 buffer.data[multi_start] |= multi_count - 2;
52 multi_count = 0;
53 }
54
55 static void file_finish(void)
56 {
57 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
58 output_stream.next_out = compressed;
59 output_stream.avail_out = compressed_storage;
60 int result = deflate(&output_stream, Z_FINISH);
61 if (Z_STREAM_END != result) {
62 fatal_error("Final deflate call returned %d\n", result);
63 }
64 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
65 fclose(event_file);
66 }
27 67
28 static const char el_ident[] = "BLSTEL\x02\x00"; 68 static const char el_ident[] = "BLSTEL\x02\x00";
29 static uint32_t last;
30 void event_log_file(char *fname) 69 void event_log_file(char *fname)
31 { 70 {
32 event_file = fopen(fname, "wb"); 71 event_file = fopen(fname, "wb");
33 if (!event_file) { 72 if (!event_file) {
34 warning("Failed to open event file %s for writing\n", fname); 73 warning("Failed to open event file %s for writing\n", fname);
35 return; 74 return;
36 } 75 }
37 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file); 76 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file);
38 init_serialize(&buffer); 77 event_log_common_init();
39 active = fully_active = 1; 78 fully_active = 1;
40 last = 0; 79 atexit(file_finish);
41 } 80 }
42 81
43 static int listen_sock, remotes[7]; 82 static int listen_sock, remotes[7];
44 static int num_remotes; 83 static int num_remotes;
45 void event_log_tcp(char *address, char *port) 84 void event_log_tcp(char *address, char *port)
68 warning("Failed to listen for event log remotes on %s:%s\n", address, port); 107 warning("Failed to listen for event log remotes on %s:%s\n", address, port);
69 socket_close(listen_sock); 108 socket_close(listen_sock);
70 goto cleanup_address; 109 goto cleanup_address;
71 } 110 }
72 socket_blocking(listen_sock, 0); 111 socket_blocking(listen_sock, 0);
73 active = 1; 112 event_log_common_init();
74 cleanup_address: 113 cleanup_address:
75 freeaddrinfo(result); 114 freeaddrinfo(result);
76 } 115 }
77 116
78 static uint8_t *system_start; 117 static uint8_t *system_start;
88 if (name_len > 255) { 127 if (name_len > 255) {
89 name_len = 255; 128 name_len = 255;
90 } 129 }
91 save_int8(&buffer, name_len); 130 save_int8(&buffer, name_len);
92 save_buffer8(&buffer, name, strlen(name)); 131 save_buffer8(&buffer, name, strlen(name));
93 if (!fully_active) { 132 if (listen_sock) {
94 system_start = malloc(buffer.size); 133 system_start = malloc(buffer.size);
95 system_start_size = buffer.size; 134 system_start_size = buffer.size;
96 memcpy(system_start, buffer.data, buffer.size); 135 memcpy(system_start, buffer.data, buffer.size);
97 buffer.size = 0; 136 } else {
98 } 137 //system start header is never compressed, so write to file immediately
138 fwrite(buffer.data, 1, buffer.size, event_file);
139 }
140 buffer.size = 0;
99 } 141 }
100 142
101 //header formats 143 //header formats
102 //Single byte: 4 bit type, 4 bit delta (16-31) 144 //Single byte: 4 bit type, 4 bit delta (16-31)
103 //Three Byte: 8 bit type, 16-bit delta 145 //Three Byte: 8 bit type, 16-bit delta
104 //Four byte: 8-bit type, 24-bit signed delta 146 //Four byte: 8-bit type, 24-bit signed delta
105 #define FORMAT_3BYTE 0xE0 147 #define FORMAT_3BYTE 0xE0
106 #define FORMAT_4BYTE 0xF0 148 #define FORMAT_4BYTE 0xF0
107 static uint8_t last_event_type = 0xFF; 149 static uint8_t last_event_type = 0xFF;
108 static uint32_t last_delta; 150 static uint32_t last_delta;
109 static uint8_t multi_count;
110 static size_t multi_start;
111 static void event_header(uint8_t type, uint32_t cycle) 151 static void event_header(uint8_t type, uint32_t cycle)
112 { 152 {
113 uint32_t delta = cycle - last; 153 uint32_t delta = cycle - last;
114 if (multi_count) { 154 if (multi_count) {
115 if (type != last_event_type || delta != last_delta) { 155 if (type != last_event_type || delta != last_delta) {
116 buffer.data[multi_start] |= multi_count - 2; 156 finish_multi();
117 multi_count = 0;
118 } else { 157 } else {
119 ++multi_count; 158 ++multi_count;
120 if (multi_count == 17) { 159 if (multi_count == 17) {
121 buffer.data[multi_start] |= multi_count - 2; 160 finish_multi();
122 last_event_type = 0xFF; 161 last_event_type = 0xFF;
123 multi_count = 0;
124 } 162 }
125 return; 163 return;
126 } 164 }
127 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) { 165 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) {
128 //make some room 166 //make some room
157 event_header(EVENT_ADJUST, cycle); 195 event_header(EVENT_ADJUST, cycle);
158 last = cycle - deduction; 196 last = cycle - deduction;
159 save_int32(&buffer, deduction); 197 save_int32(&buffer, deduction);
160 } 198 }
161 199
162 static size_t remote_send_progress[7]; 200 static uint8_t *remote_send_progress[7];
163 static uint8_t remote_needs_state[7]; 201 static uint8_t remote_needs_state[7];
164 static void flush_socket(void) 202 static void flush_socket(void)
165 { 203 {
166 int remote = accept(listen_sock, NULL, NULL); 204 int remote = accept(listen_sock, NULL, NULL);
167 if (remote != -1) { 205 if (remote != -1) {
172 remotes[num_remotes] = remote; 210 remotes[num_remotes] = remote;
173 remote_needs_state[num_remotes++] = 1; 211 remote_needs_state[num_remotes++] = 1;
174 current_system->save_state = EVENTLOG_SLOT + 1; 212 current_system->save_state = EVENTLOG_SLOT + 1;
175 } 213 }
176 } 214 }
177 size_t min_progress = 0; 215 uint8_t *min_progress = compressed;
178 for (int i = 0; i < num_remotes; i++) { 216 for (int i = 0; i < num_remotes; i++) {
179 int sent = 1; 217 int sent = 1;
180 if (remote_needs_state[i]) { 218 if (remote_needs_state[i]) {
181 remote_send_progress[i] = buffer.size; 219 remote_send_progress[i] = output_stream.next_out;
182 } else { 220 } else {
183 uint8_t buffer[1500]; 221 uint8_t buffer[1500];
184 int bytes = recv(remotes[i], buffer, sizeof(buffer), 0); 222 int bytes = recv(remotes[i], buffer, sizeof(buffer), 0);
185 for (int j = 0; j < bytes; j++) 223 for (int j = 0; j < bytes; j++)
186 { 224 {
208 warning("Unrecognized remote command %X\n", cmd); 246 warning("Unrecognized remote command %X\n", cmd);
209 j = bytes; 247 j = bytes;
210 } 248 }
211 } 249 }
212 } 250 }
213 while (sent && buffer.size - remote_send_progress[i]) 251 while (sent && output_stream.next_out > remote_send_progress[i])
214 { 252 {
215 sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0); 253 sent = send(remotes[i], remote_send_progress[i], output_stream.next_out - remote_send_progress[i], 0);
216 if (sent >= 0) { 254 if (sent >= 0) {
217 remote_send_progress[i] += sent; 255 remote_send_progress[i] += sent;
218 } else if (socket_error_is_wouldblock()) { 256 } else if (socket_error_is_wouldblock()) {
219 socket_close(remotes[i]); 257 socket_close(remotes[i]);
220 remotes[i] = remotes[num_remotes-1]; 258 remotes[i] = remotes[num_remotes-1];
227 if (remote_send_progress[i] > min_progress) { 265 if (remote_send_progress[i] > min_progress) {
228 min_progress = remote_send_progress[i]; 266 min_progress = remote_send_progress[i];
229 } 267 }
230 } 268 }
231 } 269 }
232 if (min_progress == buffer.size) { 270 if (min_progress == output_stream.next_out) {
233 buffer.size = 0; 271 output_stream.next_out = compressed;
234 memset(remote_send_progress, 0, sizeof(remote_send_progress)); 272 output_stream.avail_out = compressed_storage;
235 multi_count = 0; 273 for (int i = 0; i < num_remotes; i++) {
236 last_event_type = 0xFF; 274 remote_send_progress[i] = compressed;
275 }
237 } 276 }
238 } 277 }
239 278
240 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload) 279 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload)
241 { 280 {
243 return; 282 return;
244 } 283 }
245 event_header(type, cycle); 284 event_header(type, cycle);
246 last = cycle; 285 last = cycle;
247 save_buffer8(&buffer, payload, size); 286 save_buffer8(&buffer, payload, size);
248 if (listen_sock && buffer.size > 1280) { 287 if (!multi_count) {
249 if (multi_count) { 288 last_event_type = 0xFF;
250 buffer.data[multi_start] |= multi_count - 2; 289 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
251 multi_count = 0; 290 int result = deflate(&output_stream, Z_NO_FLUSH);
252 last_event_type = 0xFF; 291 if (result != Z_OK) {
253 } 292 fatal_error("deflate returned %d\n", result);
254 flush_socket(); 293 }
294 if (listen_sock) {
295 if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) {
296 flush_socket();
297 }
298 } else if (!output_stream.avail_out) {
299 fwrite(compressed, 1, compressed_storage, event_file);
300 output_stream.next_out = compressed;
301 output_stream.avail_out = compressed_storage;
302 }
303 if (!output_stream.avail_in) {
304 buffer.size = 0;
305 output_stream.next_in = buffer.data;
306 }
255 } 307 }
256 } 308 }
257 309
258 static uint32_t last_word_address; 310 static uint32_t last_word_address;
259 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value) 311 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value)
298 } 350 }
299 } 351 }
300 return total; 352 return total;
301 } 353 }
302 354
355 void deflate_flush(uint8_t full)
356 {
357 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
358 while (output_stream.avail_in)
359 {
360 if (!output_stream.avail_out) {
361 size_t old_storage = compressed_storage;
362 uint8_t *old_compressed = compressed;
363 compressed_storage *= 2;
364 compressed = realloc(compressed, compressed_storage);
365 output_stream.next_out = compressed + old_storage;
366 output_stream.avail_out = old_storage;
367 for (int i = 0; i < num_remotes; i++) {
368 if (!remote_needs_state[i]) {
369 remote_send_progress[i] = compressed + (remote_send_progress[i] - old_compressed);
370 }
371 }
372 }
373 int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH);
374 if (result != (full ? Z_STREAM_END : Z_OK)) {
375 fatal_error("deflate returned %d\n", result);
376 }
377 if (full) {
378 result = deflateReset(&output_stream);
379 if (result != Z_OK) {
380 fatal_error("deflateReset returned %d\n", result);
381 }
382 }
383 }
384 output_stream.next_in = buffer.data;
385 buffer.size = 0;
386 }
387
303 void event_state(uint32_t cycle, serialize_buffer *state) 388 void event_state(uint32_t cycle, serialize_buffer *state)
304 { 389 {
305 if (!fully_active) { 390 if (!fully_active) {
306 last = cycle; 391 last = cycle;
307 } 392 }
309 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last, 394 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last,
310 last_word_address >> 16, last_word_address >> 8, last_word_address, 395 last_word_address >> 16, last_word_address >> 8, last_word_address,
311 last_byte_address >> 8, last_byte_address, 396 last_byte_address >> 8, last_byte_address,
312 state->size >> 16, state->size >> 8, state->size 397 state->size >> 16, state->size >> 8, state->size
313 }; 398 };
399 uint8_t sent_system_start = 0;
314 for (int i = 0; i < num_remotes; i++) 400 for (int i = 0; i < num_remotes; i++)
315 { 401 {
316 if (remote_needs_state[i]) { 402 if (remote_needs_state[i]) {
317 if( 403 if (send_all(remotes[i], system_start, system_start_size, 0) == system_start_size) {
318 send_all(remotes[i], system_start, system_start_size, 0) == system_start_size 404 sent_system_start = 1;
319 && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header)
320 && send_all(remotes[i], state->data, state->size, 0) == state->size
321 ) {
322 remote_send_progress[i] = buffer.size;
323 remote_needs_state[i] = 0;
324 socket_blocking(remotes[i], 0);
325 int flag = 1;
326 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
327 fully_active = 1;
328 } else { 405 } else {
329 socket_close(remotes[i]); 406 socket_close(remotes[i]);
330 remotes[i] = remotes[num_remotes-1]; 407 remotes[i] = remotes[num_remotes-1];
331 remote_send_progress[i] = remote_send_progress[num_remotes-1]; 408 remote_send_progress[i] = remote_send_progress[num_remotes-1];
332 remote_needs_state[i] = remote_needs_state[num_remotes-1]; 409 remote_needs_state[i] = remote_needs_state[num_remotes-1];
333 num_remotes--; 410 num_remotes--;
334 i--; 411 i--;
335 } 412 }
336 } 413 }
337 } 414 }
415 if (sent_system_start) {
416 if (fully_active) {
417 if (multi_count) {
418 finish_multi();
419 }
420 //full flush is needed so new and old clients can share a stream
421 deflate_flush(1);
422 }
423 save_buffer8(&buffer, header, sizeof(header));
424 save_buffer8(&buffer, state->data, state->size);
425 size_t old_compressed_size = output_stream.next_out - compressed;
426 deflate_flush(1);
427 size_t state_size = output_stream.next_out - compressed - old_compressed_size;
428 for (int i = 0; i < num_remotes; i++) {
429 if (remote_needs_state[i]) {
430 if (send_all(remotes[i], compressed + old_compressed_size, state_size, 0) == state_size) {
431 remote_send_progress[i] = compressed + old_compressed_size;
432 remote_needs_state[i] = 0;
433 socket_blocking(remotes[i], 0);
434 int flag = 1;
435 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
436 fully_active = 1;
437 } else {
438 socket_close(remotes[i]);
439 remotes[i] = remotes[num_remotes-1];
440 remote_send_progress[i] = remote_send_progress[num_remotes-1];
441 remote_needs_state[i] = remote_needs_state[num_remotes-1];
442 num_remotes--;
443 i--;
444 }
445 }
446 }
447 output_stream.next_out = compressed + old_compressed_size;
448 output_stream.avail_out = compressed_storage - old_compressed_size;
449 }
338 } 450 }
339 451
340 void event_flush(uint32_t cycle) 452 void event_flush(uint32_t cycle)
341 { 453 {
342 if (!active) { 454 if (!active) {
343 return; 455 return;
344 } 456 }
345 if (fully_active) { 457 if (fully_active) {
346 event_log(EVENT_FLUSH, cycle, 0, NULL); 458 event_header(EVENT_FLUSH, cycle);
459 last = cycle;
460
461 deflate_flush(0);
347 } 462 }
348 if (event_file) { 463 if (event_file) {
349 fwrite(buffer.data, 1, buffer.size, event_file); 464 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
350 fflush(event_file); 465 fflush(event_file);
351 buffer.size = 0; 466 output_stream.next_out = compressed;
352 multi_count = 0; 467 output_stream.avail_out = compressed_storage;
353 last_event_type = 0xFF;
354 } else if (listen_sock) { 468 } else if (listen_sock) {
355 flush_socket(); 469 flush_socket();
356 } 470 }
471 }
472
473 static void init_event_reader_common(event_reader *reader)
474 {
475 reader->last_cycle = 0;
476 reader->repeat_event = 0xFF;
477 reader->storage = 512 * 1024;
478 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
479 reader->buffer.size = 0;
480 memset(&reader->input_stream, 0, sizeof(reader->input_stream));
481
357 } 482 }
358 483
359 void init_event_reader(event_reader *reader, uint8_t *data, size_t size) 484 void init_event_reader(event_reader *reader, uint8_t *data, size_t size)
360 { 485 {
361 reader->socket = 0; 486 reader->socket = 0;
362 reader->last_cycle = 0; 487 reader->last_cycle = 0;
363 reader->repeat_event = 0xFF; 488 reader->repeat_event = 0xFF;
364 init_deserialize(&reader->buffer, data, size); 489 init_event_reader_common(reader);
490 uint8_t name_len = data[1];
491 reader->buffer.size = name_len + 2;
492 memcpy(reader->buffer.data, data, reader->buffer.size);
493 reader->input_stream.next_in = data + reader->buffer.size;
494 reader->input_stream.avail_in = size - reader->buffer.size;
495
496 int result = inflateInit(&reader->input_stream);
497 if (Z_OK != result) {
498 fatal_error("inflateInit returned %d\n", result);
499 }
500 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
501 reader->input_stream.avail_out = reader->storage - reader->buffer.size;
502 result = inflate(&reader->input_stream, Z_NO_FLUSH);
503 if (Z_OK != result && Z_STREAM_END != result) {
504 fatal_error("inflate returned %d\n", result);
505 }
506 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
365 } 507 }
366 508
367 void init_event_reader_tcp(event_reader *reader, char *address, char *port) 509 void init_event_reader_tcp(event_reader *reader, char *address, char *port)
368 { 510 {
369 struct addrinfo request, *result; 511 struct addrinfo request, *result;
380 } 522 }
381 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) { 523 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) {
382 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port); 524 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port);
383 } 525 }
384 526
385 reader->storage = 512 * 1024; 527 init_event_reader_common(reader);
386 reader->last_cycle = 0; 528 reader->socket_buffer_size = 256 * 1024;
387 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage); 529 reader->socket_buffer = malloc(reader->socket_buffer_size);
388 reader->buffer.size = 0; 530
389 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2]) 531 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2])
390 { 532 {
391 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); 533 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
392 if (bytes < 0) { 534 if (bytes < 0) {
393 fatal_error("Failed to receive system init from %s:%s\n", address, port); 535 fatal_error("Failed to receive system init from %s:%s\n", address, port);
394 } 536 }
395 reader->buffer.size += bytes; 537 reader->buffer.size += bytes;
396 } 538 }
539 size_t init_msg_len = 3 + reader->buffer.data[2];
540 memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len);
541 reader->input_stream.next_in = reader->socket_buffer;
542 reader->input_stream.avail_in = reader->buffer.size - init_msg_len;
543 reader->buffer.size = init_msg_len;
544 int res = inflateInit(&reader->input_stream);
545 if (Z_OK != res) {
546 fatal_error("inflateInit returned %d\n", res);
547 }
548 reader->input_stream.next_out = reader->buffer.data + init_msg_len;
549 reader->input_stream.avail_out = reader->storage - init_msg_len;
550 res = inflate(&reader->input_stream, Z_NO_FLUSH);
551 if (Z_OK != res && Z_BUF_ERROR != res) {
552 fatal_error("inflate returned %d in init_event_reader_tcp\n", res);
553 }
397 socket_blocking(reader->socket, 0); 554 socket_blocking(reader->socket, 0);
398 int flag = 1; 555 int flag = 1;
399 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); 556 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
400 } 557 }
401 558
402 static void read_from_socket(event_reader *reader) 559 static void read_from_socket(event_reader *reader)
403 { 560 {
404 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) { 561 if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) {
405 reader->storage *= 2; 562 reader->socket_buffer_size *= 2;
406 uint8_t *new_buf = malloc(reader->storage); 563 uint8_t *new_buf = malloc(reader->socket_buffer_size);
407 memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); 564 memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in);
408 free(reader->buffer.data); 565 free(reader->socket_buffer);
409 reader->buffer.data = new_buf; 566 reader->socket_buffer = new_buf;
410 reader->buffer.size -= reader->buffer.cur_pos; 567 reader->input_stream.next_in = new_buf;
411 reader->buffer.cur_pos = 0; 568 } else if (
412 } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) { 569 reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in
570 && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2
571 ) {
572 memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in);
573 reader->input_stream.next_in = reader->socket_buffer;
574 }
575 uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in;
576 size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start;
577 int bytes = recv(reader->socket, space_start, space, 0);
578 if (bytes >= 0) {
579 reader->input_stream.avail_in += bytes;
580 } else if (!socket_error_is_wouldblock()) {
581 fatal_error("Connection closed, error = %X\n", socket_last_error());
582 }
583 }
584
585 static void inflate_flush(event_reader *reader)
586 {
587 if (reader->buffer.cur_pos > reader->storage / 2) {
413 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); 588 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
414 reader->buffer.size -= reader->buffer.cur_pos; 589 reader->buffer.size -= reader->buffer.cur_pos;
415 reader->buffer.cur_pos = 0; 590 reader->buffer.cur_pos = 0;
416 } 591 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
417 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); 592 reader->input_stream.avail_out = reader->storage - reader->buffer.size;
418 if (bytes >= 0) { 593 }
419 reader->buffer.size += bytes; 594 int result = inflate(&reader->input_stream, Z_SYNC_FLUSH);
420 } else if (!socket_error_is_wouldblock()) { 595 if (Z_OK != result && Z_STREAM_END != result) {
421 fatal_error("Connection closed, error = %X\n", socket_last_error()); 596 fatal_error("inflate returned %d\n", result);
422 } 597 }
598 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
599 if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) {
600 inflateReset(&reader->input_stream);
601 if (reader->input_stream.avail_in) {
602 inflate_flush(reader);
603 }
604 }
605
423 } 606 }
424 607
425 void reader_ensure_data(event_reader *reader, size_t bytes) 608 void reader_ensure_data(event_reader *reader, size_t bytes)
426 { 609 {
427 if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) { 610 if (reader->buffer.size - reader->buffer.cur_pos < bytes) {
428 socket_blocking(reader->socket, 1); 611 if (reader->socket) {
429 while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
430 read_from_socket(reader); 612 read_from_socket(reader);
431 } 613 }
432 socket_blocking(reader->socket, 0); 614 if (reader->input_stream.avail_in) {
615 inflate_flush(reader);
616 }
617 if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) {
618 socket_blocking(reader->socket, 1);
619 while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
620 read_from_socket(reader);
621 inflate_flush(reader);
622 }
623 socket_blocking(reader->socket, 0);
624 }
433 } 625 }
434 } 626 }
435 627
436 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) 628 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
437 { 629 {
439 reader->repeat_remaining--; 631 reader->repeat_remaining--;
440 *cycle_out = reader->last_cycle + reader->repeat_delta; 632 *cycle_out = reader->last_cycle + reader->repeat_delta;
441 reader->last_cycle = *cycle_out; 633 reader->last_cycle = *cycle_out;
442 return reader->repeat_event; 634 return reader->repeat_event;
443 } 635 }
444 if (reader->socket) { 636 reader_ensure_data(reader, 1);
445 read_from_socket(reader);
446 reader_ensure_data(reader, 1);
447 }
448 uint8_t header = load_int8(&reader->buffer); 637 uint8_t header = load_int8(&reader->buffer);
449 uint8_t ret; 638 uint8_t ret;
450 uint32_t delta; 639 uint32_t delta;
451 uint8_t multi_start = 0; 640 uint8_t multi_start = 0;
452 if ((header & 0xF0) == (EVENT_MULTI << 4)) { 641 if ((header & 0xF0) == (EVENT_MULTI << 4)) {