comparison event_log.c @ 1983:a7b753e260a2 mame_interp

Merge from default
author Michael Pavone <pavone@retrodev.com>
date Sat, 09 May 2020 23:39:44 -0700
parents 04b79a725b7f
children a042e046f7f2
comparison
equal deleted inserted replaced
1937:cafde1255ad3 1983:a7b753e260a2
1 #ifdef _WIN32
2 #define WINVER 0x501
3 #include <winsock2.h>
4 #include <ws2tcpip.h>
5 #else
6 #include <sys/types.h>
7 #include <sys/socket.h>
8 #include <unistd.h>
9 #include <netdb.h>
10 #include <netinet/tcp.h>
11 #endif
12
13 #include <errno.h>
14 #include "event_log.h"
15 #include "util.h"
16 #include "blastem.h"
17 #include "saves.h"
18 #include "zlib/zlib.h"
19
20 enum {
21 CMD_GAMEPAD_DOWN,
22 CMD_GAMEPAD_UP,
23 };
24
25 static uint8_t active, fully_active;
26 static FILE *event_file;
27 static serialize_buffer buffer;
28 static uint8_t *compressed;
29 static size_t compressed_storage;
30 static z_stream output_stream;
31 static uint32_t last;
32
33 static void event_log_common_init(void)
34 {
35 init_serialize(&buffer);
36 compressed_storage = 128*1024;
37 compressed = malloc(compressed_storage);
38 deflateInit(&output_stream, 9);
39 output_stream.avail_out = compressed_storage;
40 output_stream.next_out = compressed;
41 output_stream.avail_in = 0;
42 output_stream.next_in = buffer.data;
43 last = 0;
44 active = 1;
45 }
46
47 static uint8_t multi_count;
48 static size_t multi_start;
49 static void finish_multi(void)
50 {
51 buffer.data[multi_start] |= multi_count - 2;
52 multi_count = 0;
53 }
54
55 static void file_finish(void)
56 {
57 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
58 output_stream.next_out = compressed;
59 output_stream.avail_out = compressed_storage;
60 int result = deflate(&output_stream, Z_FINISH);
61 if (Z_STREAM_END != result) {
62 fatal_error("Final deflate call returned %d\n", result);
63 }
64 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
65 fclose(event_file);
66 }
67
68 static const char el_ident[] = "BLSTEL\x02\x00";
69 void event_log_file(char *fname)
70 {
71 event_file = fopen(fname, "wb");
72 if (!event_file) {
73 warning("Failed to open event file %s for writing\n", fname);
74 return;
75 }
76 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file);
77 event_log_common_init();
78 fully_active = 1;
79 atexit(file_finish);
80 }
81
82 typedef struct {
83 uint8_t *send_progress;
84 int sock;
85 uint8_t players[1]; //TODO: Expand when support for multiple players per remote is added
86 uint8_t num_players;
87 } remote;
88
89 static int listen_sock;
90 static remote remotes[7];
91 static int num_remotes;
92 static uint8_t available_players[7] = {2,3,4,5,6,7,8};
93 static int num_available_players = 7;
94 void event_log_tcp(char *address, char *port)
95 {
96 struct addrinfo request, *result;
97 socket_init();
98 memset(&request, 0, sizeof(request));
99 request.ai_family = AF_INET;
100 request.ai_socktype = SOCK_STREAM;
101 request.ai_flags = AI_PASSIVE;
102 getaddrinfo(address, port, &request, &result);
103
104 listen_sock = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
105 if (listen_sock < 0) {
106 warning("Failed to open event log listen socket on %s:%s\n", address, port);
107 goto cleanup_address;
108 }
109 int param = 1;
110 setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&param, sizeof(param));
111 if (bind(listen_sock, result->ai_addr, result->ai_addrlen) < 0) {
112 warning("Failed to bind event log listen socket on %s:%s\n", address, port);
113 socket_close(listen_sock);
114 goto cleanup_address;
115 }
116 if (listen(listen_sock, 3) < 0) {
117 warning("Failed to listen for event log remotes on %s:%s\n", address, port);
118 socket_close(listen_sock);
119 goto cleanup_address;
120 }
121 socket_blocking(listen_sock, 0);
122 event_log_common_init();
123 cleanup_address:
124 freeaddrinfo(result);
125 }
126
127 static uint8_t *system_start;
128 static size_t system_start_size;
129 void event_system_start(system_type stype, vid_std video_std, char *name)
130 {
131 if (!active) {
132 return;
133 }
134 save_int8(&buffer, stype);
135 save_int8(&buffer, video_std);
136 size_t name_len = strlen(name);
137 if (name_len > 255) {
138 name_len = 255;
139 }
140 save_int8(&buffer, name_len);
141 save_buffer8(&buffer, name, strlen(name));
142 if (listen_sock) {
143 system_start = malloc(buffer.size);
144 system_start_size = buffer.size;
145 memcpy(system_start, buffer.data, buffer.size);
146 } else {
147 //system start header is never compressed, so write to file immediately
148 fwrite(buffer.data, 1, buffer.size, event_file);
149 }
150 buffer.size = 0;
151 }
152
153 //header formats
154 //Single byte: 4 bit type, 4 bit delta (16-31)
155 //Three Byte: 8 bit type, 16-bit delta
156 //Four byte: 8-bit type, 24-bit signed delta
157 #define FORMAT_3BYTE 0xE0
158 #define FORMAT_4BYTE 0xF0
159 static uint8_t last_event_type = 0xFF;
160 static uint32_t last_delta;
161 static void event_header(uint8_t type, uint32_t cycle)
162 {
163 uint32_t delta = cycle - last;
164 if (multi_count) {
165 if (type != last_event_type || delta != last_delta) {
166 finish_multi();
167 } else {
168 ++multi_count;
169 if (multi_count == 17) {
170 finish_multi();
171 last_event_type = 0xFF;
172 }
173 return;
174 }
175 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) {
176 //make some room
177 save_int8(&buffer, 0);
178 //shift existing command
179 memmove(buffer.data + multi_start + 1, buffer.data + multi_start, buffer.size - multi_start - 1);
180 buffer.data[multi_start] = EVENT_MULTI << 4;
181 multi_count = 2;
182 return;
183 }
184 multi_start = buffer.size;
185 last_event_type = type;
186 last_delta = delta;
187
188 if (delta > 65535) {
189 save_int8(&buffer, FORMAT_4BYTE | type);
190 save_int8(&buffer, delta >> 16);
191 save_int16(&buffer, delta);
192 } else if (delta >= 16 && delta < 32) {
193 save_int8(&buffer, type << 4 | (delta - 16));
194 } else {
195 save_int8(&buffer, FORMAT_3BYTE | type);
196 save_int16(&buffer, delta);
197 }
198 }
199
200 void event_cycle_adjust(uint32_t cycle, uint32_t deduction)
201 {
202 if (!fully_active) {
203 return;
204 }
205 event_header(EVENT_ADJUST, cycle);
206 last = cycle - deduction;
207 save_int32(&buffer, deduction);
208 }
209
210 static uint8_t next_available_player(void)
211 {
212 uint8_t lowest = 0xFF;
213 int lowest_index = -1;
214 for (int i = 0; i < num_available_players; i++)
215 {
216 if (available_players[i] < lowest) {
217 lowest = available_players[i];
218 lowest_index = i;
219 }
220 }
221 if (lowest_index >= 0) {
222 available_players[lowest_index] = available_players[num_available_players - 1];
223 --num_available_players;
224 }
225 return lowest;
226 }
227
228 static void flush_socket(void)
229 {
230 int remote_sock = accept(listen_sock, NULL, NULL);
231 if (remote_sock != -1) {
232 if (num_remotes == 7) {
233 socket_close(remote_sock);
234 } else {
235 printf("remote %d connected\n", num_remotes);
236 uint8_t player = next_available_player();
237 remotes[num_remotes++] = (remote){
238 .sock = remote_sock,
239 .send_progress = NULL,
240 .players = {player},
241 .num_players = player == 0xFF ? 0 : 1
242 };
243 current_system->save_state = EVENTLOG_SLOT + 1;
244 }
245 }
246 uint8_t *min_progress = compressed;
247 for (int i = 0; i < num_remotes; i++) {
248 if (remotes[i].send_progress) {
249 uint8_t recv_buffer[1500];
250 int bytes = recv(remotes[i].sock, recv_buffer, sizeof(recv_buffer), 0);
251 for (int j = 0; j < bytes; j++)
252 {
253 uint8_t cmd = recv_buffer[j];
254 switch(cmd)
255 {
256 case CMD_GAMEPAD_DOWN:
257 case CMD_GAMEPAD_UP: {
258 ++j;
259 if (j < bytes) {
260 uint8_t button = recv_buffer[j];
261 uint8_t pad = (button >> 5) - 1;
262 button &= 0x1F;
263 if (pad < remotes[i].num_players) {
264 pad = remotes[i].players[pad];
265 if (cmd == CMD_GAMEPAD_DOWN) {
266 current_system->gamepad_down(current_system, pad, button);
267 } else {
268 current_system->gamepad_up(current_system, pad, button);
269 }
270 }
271 } else {
272 warning("Received incomplete command %X\n", cmd);
273 }
274 break;
275 }
276 default:
277 warning("Unrecognized remote command %X\n", cmd);
278 j = bytes;
279 }
280 }
281 int sent = 1;
282 while (sent && output_stream.next_out > remotes[i].send_progress)
283 {
284 sent = send(remotes[i].sock, remotes[i].send_progress, output_stream.next_out - remotes[i].send_progress, 0);
285 if (sent >= 0) {
286 remotes[i].send_progress += sent;
287 } else if (!socket_error_is_wouldblock()) {
288 socket_close(remotes[i].sock);
289 for (int j = 0; j < remotes[i].num_players; j++) {
290 available_players[num_available_players++] = remotes[i].players[j];
291 }
292 remotes[i] = remotes[num_remotes-1];
293 num_remotes--;
294 if (!num_remotes) {
295 //last remote disconnected, reset buffers/deflate
296 fully_active = 0;
297 deflateReset(&output_stream);
298 output_stream.next_out = compressed;
299 output_stream.avail_out = compressed_storage;
300 buffer.size = 0;
301 }
302 i--;
303 break;
304 }
305 if (remotes[i].send_progress > min_progress) {
306 min_progress = remotes[i].send_progress;
307 }
308 }
309 }
310 }
311 if (min_progress == output_stream.next_out) {
312 output_stream.next_out = compressed;
313 output_stream.avail_out = compressed_storage;
314 for (int i = 0; i < num_remotes; i++) {
315 if (remotes[i].send_progress) {
316 remotes[i].send_progress = compressed;
317 }
318 }
319 }
320 }
321
322 uint8_t wrote_since_last_flush;
323 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload)
324 {
325 if (!fully_active) {
326 return;
327 }
328 event_header(type, cycle);
329 last = cycle;
330 save_buffer8(&buffer, payload, size);
331 if (!multi_count) {
332 last_event_type = 0xFF;
333 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
334 int result = deflate(&output_stream, Z_NO_FLUSH);
335 if (result != Z_OK) {
336 fatal_error("deflate returned %d\n", result);
337 }
338 if (listen_sock) {
339 if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) {
340 flush_socket();
341 wrote_since_last_flush = 1;
342 }
343 } else if (!output_stream.avail_out) {
344 fwrite(compressed, 1, compressed_storage, event_file);
345 output_stream.next_out = compressed;
346 output_stream.avail_out = compressed_storage;
347 }
348 if (!output_stream.avail_in) {
349 buffer.size = 0;
350 output_stream.next_in = buffer.data;
351 }
352 }
353 }
354
355 static uint32_t last_word_address;
356 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value)
357 {
358 uint32_t delta = address - last_word_address;
359 if (delta < 256) {
360 uint8_t buffer[3] = {delta, value >> 8, value};
361 event_log(EVENT_VRAM_WORD_DELTA, cycle, sizeof(buffer), buffer);
362 } else {
363 uint8_t buffer[5] = {address >> 16, address >> 8, address, value >> 8, value};
364 event_log(EVENT_VRAM_WORD, cycle, sizeof(buffer), buffer);
365 }
366 last_word_address = address;
367 }
368
369 static uint32_t last_byte_address;
370 void event_vram_byte(uint32_t cycle, uint16_t address, uint8_t byte, uint8_t auto_inc)
371 {
372 uint32_t delta = address - last_byte_address;
373 if (delta == 1) {
374 event_log(EVENT_VRAM_BYTE_ONE, cycle, sizeof(byte), &byte);
375 } else if (delta == auto_inc) {
376 event_log(EVENT_VRAM_BYTE_AUTO, cycle, sizeof(byte), &byte);
377 } else if (delta < 256) {
378 uint8_t buffer[2] = {delta, byte};
379 event_log(EVENT_VRAM_BYTE_DELTA, cycle, sizeof(buffer), buffer);
380 } else {
381 uint8_t buffer[3] = {address >> 8, address, byte};
382 event_log(EVENT_VRAM_BYTE, cycle, sizeof(buffer), buffer);
383 }
384 last_byte_address = address;
385 }
386
387 static size_t send_all(int sock, uint8_t *data, size_t size, int flags)
388 {
389 size_t total = 0, sent = 1;
390 while(sent > 0 && total < size)
391 {
392 sent = send(sock, data + total, size - total, flags);
393 if (sent > 0) {
394 total += sent;
395 }
396 }
397 return total;
398 }
399
400 void deflate_flush(uint8_t full)
401 {
402 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
403 uint8_t force = full;
404 while (output_stream.avail_in || force)
405 {
406 if (!output_stream.avail_out) {
407 size_t old_storage = compressed_storage;
408 uint8_t *old_compressed = compressed;
409 compressed_storage *= 2;
410 compressed = realloc(compressed, compressed_storage);
411 output_stream.next_out = compressed + old_storage;
412 output_stream.avail_out = old_storage;
413 for (int i = 0; i < num_remotes; i++) {
414 if (remotes[i].send_progress) {
415 remotes[i].send_progress = compressed + (remotes[i].send_progress - old_compressed);
416 }
417 }
418 }
419 int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH);
420 if (result != (full ? Z_STREAM_END : Z_OK)) {
421 fatal_error("deflate returned %d\n", result);
422 }
423 if (full && result == Z_STREAM_END) {
424 result = deflateReset(&output_stream);
425 if (result != Z_OK) {
426 fatal_error("deflateReset returned %d\n", result);
427 }
428 }
429 force = 0;
430 }
431 output_stream.next_in = buffer.data;
432 buffer.size = 0;
433 }
434
435 void event_state(uint32_t cycle, serialize_buffer *state)
436 {
437 if (!fully_active) {
438 last = cycle;
439 }
440 uint8_t header[] = {
441 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last,
442 last_word_address >> 16, last_word_address >> 8, last_word_address,
443 last_byte_address >> 8, last_byte_address,
444 state->size >> 16, state->size >> 8, state->size
445 };
446 uint8_t sent_system_start = 0;
447 for (int i = 0; i < num_remotes; i++)
448 {
449 if (!remotes[i].send_progress) {
450 if (send_all(remotes[i].sock, system_start, system_start_size, 0) == system_start_size) {
451 sent_system_start = 1;
452 } else {
453 socket_close(remotes[i].sock);
454 remotes[i] = remotes[num_remotes-1];
455 num_remotes--;
456 i--;
457 }
458 }
459 }
460 if (sent_system_start) {
461 if (fully_active) {
462 if (multi_count) {
463 finish_multi();
464 }
465 //full flush is needed so new and old clients can share a stream
466 deflate_flush(1);
467 }
468 save_buffer8(&buffer, header, sizeof(header));
469 save_buffer8(&buffer, state->data, state->size);
470 size_t old_compressed_size = output_stream.next_out - compressed;
471 deflate_flush(1);
472 size_t state_size = output_stream.next_out - compressed - old_compressed_size;
473 for (int i = 0; i < num_remotes; i++) {
474 if (!remotes[i].send_progress) {
475 if (send_all(remotes[i].sock, compressed + old_compressed_size, state_size, 0) == state_size) {
476 remotes[i].send_progress = compressed + old_compressed_size;
477 socket_blocking(remotes[i].sock, 0);
478 int flag = 1;
479 setsockopt(remotes[i].sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
480 fully_active = 1;
481 } else {
482 socket_close(remotes[i].sock);
483 remotes[i] = remotes[num_remotes-1];
484 num_remotes--;
485 i--;
486 }
487 }
488 }
489 output_stream.next_out = compressed + old_compressed_size;
490 output_stream.avail_out = compressed_storage - old_compressed_size;
491 }
492 }
493
494 void event_flush(uint32_t cycle)
495 {
496 if (!active) {
497 return;
498 }
499 if (fully_active) {
500 event_header(EVENT_FLUSH, cycle);
501 last = cycle;
502
503 deflate_flush(0);
504 }
505 if (event_file) {
506 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
507 fflush(event_file);
508 output_stream.next_out = compressed;
509 output_stream.avail_out = compressed_storage;
510 } else if (listen_sock) {
511 flush_socket();
512 wrote_since_last_flush = 0;
513 }
514 }
515
516 void event_soft_flush(uint32_t cycle)
517 {
518 if (!fully_active || wrote_since_last_flush || event_file) {
519 return;
520 }
521 event_header(EVENT_FLUSH, cycle);
522 last = cycle;
523
524 deflate_flush(0);
525 flush_socket();
526 }
527
528 static void init_event_reader_common(event_reader *reader)
529 {
530 reader->last_cycle = 0;
531 reader->repeat_event = 0xFF;
532 reader->storage = 512 * 1024;
533 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
534 reader->buffer.size = 0;
535 memset(&reader->input_stream, 0, sizeof(reader->input_stream));
536
537 }
538
539 void init_event_reader(event_reader *reader, uint8_t *data, size_t size)
540 {
541 reader->socket = 0;
542 reader->last_cycle = 0;
543 reader->repeat_event = 0xFF;
544 init_event_reader_common(reader);
545 uint8_t name_len = data[1];
546 reader->buffer.size = name_len + 2;
547 memcpy(reader->buffer.data, data, reader->buffer.size);
548 reader->input_stream.next_in = data + reader->buffer.size;
549 reader->input_stream.avail_in = size - reader->buffer.size;
550
551 int result = inflateInit(&reader->input_stream);
552 if (Z_OK != result) {
553 fatal_error("inflateInit returned %d\n", result);
554 }
555 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
556 reader->input_stream.avail_out = reader->storage - reader->buffer.size;
557 result = inflate(&reader->input_stream, Z_NO_FLUSH);
558 if (Z_OK != result && Z_STREAM_END != result) {
559 fatal_error("inflate returned %d\n", result);
560 }
561 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
562 }
563
564 void init_event_reader_tcp(event_reader *reader, char *address, char *port)
565 {
566 struct addrinfo request, *result;
567 socket_init();
568 memset(&request, 0, sizeof(request));
569 request.ai_family = AF_INET;
570 request.ai_socktype = SOCK_STREAM;
571 request.ai_flags = AI_PASSIVE;
572 getaddrinfo(address, port, &request, &result);
573
574 reader->socket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
575 if (reader->socket < 0) {
576 fatal_error("Failed to create socket for event log connection to %s:%s\n", address, port);
577 }
578 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) {
579 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port);
580 }
581
582 init_event_reader_common(reader);
583 reader->socket_buffer_size = 256 * 1024;
584 reader->socket_buffer = malloc(reader->socket_buffer_size);
585
586 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2])
587 {
588 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
589 if (bytes < 0) {
590 fatal_error("Failed to receive system init from %s:%s\n", address, port);
591 }
592 reader->buffer.size += bytes;
593 }
594 size_t init_msg_len = 3 + reader->buffer.data[2];
595 memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len);
596 reader->input_stream.next_in = reader->socket_buffer;
597 reader->input_stream.avail_in = reader->buffer.size - init_msg_len;
598 reader->buffer.size = init_msg_len;
599 int res = inflateInit(&reader->input_stream);
600 if (Z_OK != res) {
601 fatal_error("inflateInit returned %d\n", res);
602 }
603 reader->input_stream.next_out = reader->buffer.data + init_msg_len;
604 reader->input_stream.avail_out = reader->storage - init_msg_len;
605 res = inflate(&reader->input_stream, Z_NO_FLUSH);
606 if (Z_OK != res && Z_BUF_ERROR != res) {
607 fatal_error("inflate returned %d in init_event_reader_tcp\n", res);
608 }
609 int flag = 1;
610 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
611 }
612
613 static void read_from_socket(event_reader *reader)
614 {
615 if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) {
616 reader->socket_buffer_size *= 2;
617 uint8_t *new_buf = malloc(reader->socket_buffer_size);
618 memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in);
619 free(reader->socket_buffer);
620 reader->socket_buffer = new_buf;
621 reader->input_stream.next_in = new_buf;
622 } else if (
623 reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in
624 && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2
625 ) {
626 memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in);
627 reader->input_stream.next_in = reader->socket_buffer;
628 }
629 uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in;
630 size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start;
631 int bytes = recv(reader->socket, space_start, space, 0);
632 if (bytes >= 0) {
633 reader->input_stream.avail_in += bytes;
634 } else if (!socket_error_is_wouldblock()) {
635 fatal_error("Connection closed, error = %X\n", socket_last_error());
636 }
637 }
638
639 static void inflate_flush(event_reader *reader)
640 {
641 if (reader->buffer.cur_pos > reader->storage / 2) {
642 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
643 reader->buffer.size -= reader->buffer.cur_pos;
644 reader->buffer.cur_pos = 0;
645 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
646 reader->input_stream.avail_out = reader->storage - reader->buffer.size;
647 }
648 int result = inflate(&reader->input_stream, Z_SYNC_FLUSH);
649 if (Z_OK != result && Z_STREAM_END != result) {
650 fatal_error("inflate returned %d\n", result);
651 }
652 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
653 if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) {
654 inflateReset(&reader->input_stream);
655 if (reader->input_stream.avail_in) {
656 inflate_flush(reader);
657 }
658 }
659
660 }
661
662 void reader_ensure_data(event_reader *reader, size_t bytes)
663 {
664 if (reader->buffer.size - reader->buffer.cur_pos < bytes) {
665 if (reader->input_stream.avail_in) {
666 inflate_flush(reader);
667 }
668 if (reader->socket) {
669 while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
670 read_from_socket(reader);
671 inflate_flush(reader);
672 }
673 }
674 }
675 }
676
677 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
678 {
679 if (reader->repeat_remaining) {
680 reader->repeat_remaining--;
681 *cycle_out = reader->last_cycle + reader->repeat_delta;
682 reader->last_cycle = *cycle_out;
683 return reader->repeat_event;
684 }
685 reader_ensure_data(reader, 1);
686 uint8_t header = load_int8(&reader->buffer);
687 uint8_t ret;
688 uint32_t delta;
689 uint8_t multi_start = 0;
690 if ((header & 0xF0) == (EVENT_MULTI << 4)) {
691 reader->repeat_remaining = (header & 0xF) + 1;
692 multi_start = 1;
693 reader_ensure_data(reader, 1);
694 header = load_int8(&reader->buffer);
695 }
696 if ((header & 0xF0) < FORMAT_3BYTE) {
697 delta = (header & 0xF) + 16;
698 ret = header >> 4;
699 } else if ((header & 0xF0) == FORMAT_3BYTE) {
700 reader_ensure_data(reader, 2);
701 delta = load_int16(&reader->buffer);
702 ret = header & 0xF;
703 } else {
704 reader_ensure_data(reader, 3);
705 delta = load_int8(&reader->buffer) << 16;
706 //sign extend 24-bit delta to 32-bit
707 if (delta & 0x800000) {
708 delta |= 0xFF000000;
709 }
710 delta |= load_int16(&reader->buffer);
711 ret = header & 0xF;
712 }
713 if (multi_start) {
714 reader->repeat_event = ret;
715 reader->repeat_delta = delta;
716 }
717 *cycle_out = reader->last_cycle + delta;
718 reader->last_cycle = *cycle_out;
719 if (ret == EVENT_ADJUST) {
720 reader_ensure_data(reader, 4);
721 size_t old_pos = reader->buffer.cur_pos;
722 uint32_t adjust = load_int32(&reader->buffer);
723 reader->buffer.cur_pos = old_pos;
724 reader->last_cycle -= adjust;
725 } else if (ret == EVENT_STATE) {
726 reader_ensure_data(reader, 8);
727 reader->last_cycle = load_int32(&reader->buffer);
728 reader->last_word_address = load_int8(&reader->buffer) << 16;
729 reader->last_word_address |= load_int16(&reader->buffer);
730 reader->last_byte_address = load_int16(&reader->buffer);
731 }
732 return ret;
733 }
734
735 uint8_t reader_system_type(event_reader *reader)
736 {
737 return load_int8(&reader->buffer);
738 }
739
740 void reader_send_gamepad_event(event_reader *reader, uint8_t pad, uint8_t button, uint8_t down)
741 {
742 uint8_t buffer[] = {down ? CMD_GAMEPAD_DOWN : CMD_GAMEPAD_UP, pad << 5 | button};
743 //TODO: Deal with the fact that we're not in blocking mode so this may not actually send all
744 //if the buffer is full
745 send_all(reader->socket, buffer, sizeof(buffer), 0);
746 }