comparison event_log.c @ 1949:5a76a7373823

Get WIP net play code compiling on Windows and cleanup some unistd.h includes
author Michael Pavone <pavone@retrodev.com>
date Thu, 30 Apr 2020 23:15:50 -0700
parents d01527615c7c
children 1c7af12efe8b
comparison
equal deleted inserted replaced
1948:d01527615c7c 1949:5a76a7373823
4 #include <ws2tcpip.h> 4 #include <ws2tcpip.h>
5 #else 5 #else
6 #include <sys/types.h> 6 #include <sys/types.h>
7 #include <sys/socket.h> 7 #include <sys/socket.h>
8 #include <unistd.h> 8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <netdb.h> 9 #include <netdb.h>
11 #include <netinet/tcp.h> 10 #include <netinet/tcp.h>
12 #endif 11 #endif
13 12
14 #include <errno.h> 13 #include <errno.h>
44 static int listen_sock, remotes[7]; 43 static int listen_sock, remotes[7];
45 static int num_remotes; 44 static int num_remotes;
46 void event_log_tcp(char *address, char *port) 45 void event_log_tcp(char *address, char *port)
47 { 46 {
48 struct addrinfo request, *result; 47 struct addrinfo request, *result;
48 socket_init();
49 memset(&request, 0, sizeof(request)); 49 memset(&request, 0, sizeof(request));
50 request.ai_family = AF_INET; 50 request.ai_family = AF_INET;
51 request.ai_socktype = SOCK_STREAM; 51 request.ai_socktype = SOCK_STREAM;
52 request.ai_flags = AI_PASSIVE; 52 request.ai_flags = AI_PASSIVE;
53 getaddrinfo(address, port, &request, &result); 53 getaddrinfo(address, port, &request, &result);
55 listen_sock = socket(result->ai_family, result->ai_socktype, result->ai_protocol); 55 listen_sock = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
56 if (listen_sock < 0) { 56 if (listen_sock < 0) {
57 warning("Failed to open event log listen socket on %s:%s\n", address, port); 57 warning("Failed to open event log listen socket on %s:%s\n", address, port);
58 goto cleanup_address; 58 goto cleanup_address;
59 } 59 }
60 int non_block = 1; 60 int param = 1;
61 setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &non_block, sizeof(non_block)); 61 setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&param, sizeof(param));
62 if (bind(listen_sock, result->ai_addr, result->ai_addrlen) < 0) { 62 if (bind(listen_sock, result->ai_addr, result->ai_addrlen) < 0) {
63 warning("Failed to bind event log listen socket on %s:%s\n", address, port); 63 warning("Failed to bind event log listen socket on %s:%s\n", address, port);
64 close(listen_sock); 64 socket_close(listen_sock);
65 goto cleanup_address; 65 goto cleanup_address;
66 } 66 }
67 if (listen(listen_sock, 3) < 0) { 67 if (listen(listen_sock, 3) < 0) {
68 warning("Failed to listen for event log remotes on %s:%s\n", address, port); 68 warning("Failed to listen for event log remotes on %s:%s\n", address, port);
69 close(listen_sock); 69 socket_close(listen_sock);
70 goto cleanup_address; 70 goto cleanup_address;
71 } 71 }
72 fcntl(listen_sock, F_SETFL, O_NONBLOCK); 72 socket_blocking(listen_sock, 0);
73 active = 1; 73 active = 1;
74 cleanup_address: 74 cleanup_address:
75 freeaddrinfo(result); 75 freeaddrinfo(result);
76 } 76 }
77 77
134 static void flush_socket(void) 134 static void flush_socket(void)
135 { 135 {
136 int remote = accept(listen_sock, NULL, NULL); 136 int remote = accept(listen_sock, NULL, NULL);
137 if (remote != -1) { 137 if (remote != -1) {
138 if (num_remotes == 7) { 138 if (num_remotes == 7) {
139 close(remote); 139 socket_close(remote);
140 } else { 140 } else {
141 printf("remote %d connected\n", num_remotes); 141 printf("remote %d connected\n", num_remotes);
142 remotes[num_remotes] = remote; 142 remotes[num_remotes] = remote;
143 remote_needs_state[num_remotes++] = 1; 143 remote_needs_state[num_remotes++] = 1;
144 current_system->save_state = EVENTLOG_SLOT + 1; 144 current_system->save_state = EVENTLOG_SLOT + 1;
145 } 145 }
146 } 146 }
147 size_t min_progress = 0; 147 size_t min_progress = 0;
148 for (int i = 0; i < num_remotes; i++) { 148 for (int i = 0; i < num_remotes; i++) {
149 errno = 0;
150 int sent = 1; 149 int sent = 1;
151 if (remote_needs_state[i]) { 150 if (remote_needs_state[i]) {
152 remote_send_progress[i] = buffer.size; 151 remote_send_progress[i] = buffer.size;
153 } else { 152 } else {
154 uint8_t buffer[1500]; 153 uint8_t buffer[1500];
184 while (sent && buffer.size - remote_send_progress[i]) 183 while (sent && buffer.size - remote_send_progress[i])
185 { 184 {
186 sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0); 185 sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0);
187 if (sent >= 0) { 186 if (sent >= 0) {
188 remote_send_progress[i] += sent; 187 remote_send_progress[i] += sent;
189 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { 188 } else if (socket_error_is_wouldblock()) {
190 close(remotes[i]); 189 socket_close(remotes[i]);
191 remotes[i] = remotes[num_remotes-1]; 190 remotes[i] = remotes[num_remotes-1];
192 remote_send_progress[i] = remote_send_progress[num_remotes-1]; 191 remote_send_progress[i] = remote_send_progress[num_remotes-1];
193 remote_needs_state[i] = remote_needs_state[num_remotes-1]; 192 remote_needs_state[i] = remote_needs_state[num_remotes-1];
194 num_remotes--; 193 num_remotes--;
195 i--; 194 i--;
283 && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header) 282 && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header)
284 && send_all(remotes[i], state->data, state->size, 0) == state->size 283 && send_all(remotes[i], state->data, state->size, 0) == state->size
285 ) { 284 ) {
286 remote_send_progress[i] = buffer.size; 285 remote_send_progress[i] = buffer.size;
287 remote_needs_state[i] = 0; 286 remote_needs_state[i] = 0;
288 fcntl(remotes[i], F_SETFL, O_NONBLOCK); 287 socket_blocking(remotes[i], 0);
289 int flag = 1; 288 int flag = 1;
290 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); 289 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
291 fully_active = 1; 290 fully_active = 1;
292 } else { 291 } else {
293 close(remotes[i]); 292 socket_close(remotes[i]);
294 remotes[i] = remotes[num_remotes-1]; 293 remotes[i] = remotes[num_remotes-1];
295 remote_send_progress[i] = remote_send_progress[num_remotes-1]; 294 remote_send_progress[i] = remote_send_progress[num_remotes-1];
296 remote_needs_state[i] = remote_needs_state[num_remotes-1]; 295 remote_needs_state[i] = remote_needs_state[num_remotes-1];
297 num_remotes--; 296 num_remotes--;
298 i--; 297 i--;
326 } 325 }
327 326
328 void init_event_reader_tcp(event_reader *reader, char *address, char *port) 327 void init_event_reader_tcp(event_reader *reader, char *address, char *port)
329 { 328 {
330 struct addrinfo request, *result; 329 struct addrinfo request, *result;
330 socket_init();
331 memset(&request, 0, sizeof(request)); 331 memset(&request, 0, sizeof(request));
332 request.ai_family = AF_INET; 332 request.ai_family = AF_INET;
333 request.ai_socktype = SOCK_STREAM; 333 request.ai_socktype = SOCK_STREAM;
334 request.ai_flags = AI_PASSIVE; 334 request.ai_flags = AI_PASSIVE;
335 getaddrinfo(address, port, &request, &result); 335 getaddrinfo(address, port, &request, &result);
352 if (bytes < 0) { 352 if (bytes < 0) {
353 fatal_error("Failed to receive system init from %s:%s\n", address, port); 353 fatal_error("Failed to receive system init from %s:%s\n", address, port);
354 } 354 }
355 reader->buffer.size += bytes; 355 reader->buffer.size += bytes;
356 } 356 }
357 fcntl(reader->socket, F_SETFL, O_NONBLOCK); 357 socket_blocking(reader->socket, 0);
358 int flag = 1; 358 int flag = 1;
359 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); 359 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
360 } 360 }
361 361
362 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) 362 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
363 { 363 {
364 if (reader->socket) { 364 if (reader->socket) {
365 uint8_t blocking = 0; 365 uint8_t blocking = 0;
366 if (reader->buffer.size - reader->buffer.cur_pos < 9) { 366 if (reader->buffer.size - reader->buffer.cur_pos < 9) {
367 //set back to block mode 367 //set back to block mode
368 fcntl(reader->socket, F_SETFL, 0); 368 socket_blocking(reader->socket, 1);
369 blocking = 1; 369 blocking = 1;
370 } 370 }
371 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) { 371 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) {
372 reader->storage *= 2; 372 reader->storage *= 2;
373 uint8_t *new_buf = malloc(reader->storage); 373 uint8_t *new_buf = malloc(reader->storage);
382 reader->buffer.cur_pos = 0; 382 reader->buffer.cur_pos = 0;
383 } 383 }
384 int bytes = 128; 384 int bytes = 128;
385 while (bytes > 127 && reader->buffer.size < reader->storage) 385 while (bytes > 127 && reader->buffer.size < reader->storage)
386 { 386 {
387 errno = 0;
388 bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); 387 bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
389 if (bytes >= 0) { 388 if (bytes >= 0) {
390 reader->buffer.size += bytes; 389 reader->buffer.size += bytes;
391 if (blocking && reader->buffer.size - reader->buffer.cur_pos >= 9) { 390 if (blocking && reader->buffer.size - reader->buffer.cur_pos >= 9) {
392 fcntl(reader->socket, F_SETFL, O_NONBLOCK); 391 socket_blocking(reader->socket, 0);
393 } 392 }
394 } else if (errno != EAGAIN && errno != EWOULDBLOCK) { 393 } else if (!socket_error_is_wouldblock()) {
395 puts("Connection closed"); 394 printf("Connection closed, error = %X\n", socket_last_error());
396 exit(0);
397 } 395 }
398 } 396 }
399 } 397 }
400 uint8_t header = load_int8(&reader->buffer); 398 uint8_t header = load_int8(&reader->buffer);
401 uint8_t ret; 399 uint8_t ret;