Skip to content

Commit cab039d

Browse files
committed
fix(ipc): handle multiple waiters
Signed-off-by: Marco Casaroli <marco.casaroli@gmail.com>
1 parent b18d5ae commit cab039d

3 files changed

Lines changed: 129 additions & 17 deletions

File tree

src/samples/supervisor/posix/client/client.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ struct ocre_container *ocre_context_create_container(struct ocre_context *contex
117117
uint8_t payload[LARGE_PAYLOAD_SIZE];
118118
bool success;
119119

120+
if (!context) {
121+
return NULL;
122+
}
123+
120124
/* Encode the request */
121125
ZCBOR_STATE_E(encoding_state, 0, payload, sizeof(payload), 0);
122126

src/samples/supervisor/posix/server/handlers.c

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,6 @@
3636
#define MAX_CONTAINERS 64
3737
#define MAX_STRING_ARRAY 32
3838

39-
/* Global ocre context for the server */
40-
// static struct ocre_context *ctx = NULL;
41-
42-
// static void print_hex(const char *label, const uint8_t *data, size_t len)
43-
// {
44-
// printf("HEX %s: ", label);
45-
// for (size_t i = 0; i < len; i++) {
46-
// printf("%02x ", data[i]);
47-
// }
48-
// printf("\n");
49-
// }
50-
5139
static int handle_context_create_container(struct ocre_context *ctx, zcbor_state_t *dec_state, uint8_t *tx_buf,
5240
size_t tx_buf_size)
5341
{

src/samples/supervisor/posix/server/waiters.c

Lines changed: 125 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
* SPDX-License-Identifier: Apache-2.0
66
*/
77

8+
#include <pthread.h>
89
#include <stdlib.h>
910
#include <string.h>
1011
#include <unistd.h>
1112
#include <stdio.h>
1213
#include <stdint.h>
14+
#include <poll.h>
1315

1416
#include <sys/socket.h>
1517

@@ -28,19 +30,28 @@ struct client {
2830
};
2931

3032
struct waiter {
33+
pthread_mutex_t mutex;
34+
bool finished;
35+
int exit_status;
36+
int result;
3137
struct ocre_container *container;
32-
pthread_t thread;
38+
pthread_t wait_thread;
39+
pthread_t socket_thread;
3340
struct client *clients;
3441
struct waiter *next;
3542
};
3643

3744
static struct waiter *waiters = NULL;
3845

39-
static void *container_wait_thread(void *arg)
46+
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
47+
48+
static void *wait_thread(void *arg)
4049
{
4150
uint8_t tx_buf[TX_BUFFER_SIZE];
4251
struct waiter *waiter = (struct waiter *)arg;
4352

53+
fprintf(stderr, "Wait thread started\n");
54+
4455
/* Call the actual function */
4556
waiter->result = ocre_container_wait(waiter->container, &waiter->exit_status);
4657

@@ -82,6 +93,8 @@ static void *container_wait_thread(void *arg)
8293
struct client *client = NULL;
8394
struct client *elt = NULL;
8495

96+
pthread_mutex_lock(&waiter->mutex);
97+
8598
LL_FOREACH_SAFE(waiter->clients, client, elt)
8699
{
87100
if (send(client->socket, tx_buf, response_len, 0) < 0) {
@@ -97,7 +110,93 @@ static void *container_wait_thread(void *arg)
97110
LL_DELETE(waiter->clients, client);
98111
}
99112

100-
LL_DELETE(waiters, waiter);
113+
fprintf(stderr, "WAIT FOR SOCKET THREAD TO FINISH\n");
114+
115+
waiter->finished = true;
116+
117+
pthread_mutex_unlock(&waiter->mutex);
118+
119+
// if (!waiter->finished) {
120+
/* Wait for the socket thread to finish */
121+
pthread_join(waiter->socket_thread, NULL);
122+
// }
123+
124+
fprintf(stderr, "SOCKET THREAD FINISHED\n");
125+
126+
fprintf(stderr, "returned\n");
127+
128+
return NULL;
129+
}
130+
131+
static void *socket_thread(void *arg)
132+
{
133+
struct client *client = NULL;
134+
struct waiter *waiter = (struct waiter *)arg;
135+
136+
while (true) {
137+
int count = 0;
138+
pthread_mutex_lock(&waiter->mutex);
139+
140+
if (waiter->finished) {
141+
pthread_mutex_unlock(&waiter->mutex);
142+
return NULL;
143+
};
144+
145+
LL_COUNT(waiter->clients, client, count);
146+
147+
fprintf(stderr, "Number of clients: %d.\n", count);
148+
149+
uint8_t rx_buf[RX_BUFFER_SIZE];
150+
151+
struct pollfd *pfds = malloc(sizeof(struct pollfd) * count);
152+
if (!pfds) {
153+
perror("malloc");
154+
pthread_mutex_unlock(&waiter->mutex);
155+
return NULL;
156+
}
157+
158+
int i = 0;
159+
LL_FOREACH(waiter->clients, client)
160+
{
161+
pfds[i].fd = client->socket;
162+
pfds[i].events = POLLIN;
163+
i++;
164+
}
165+
166+
pthread_mutex_unlock(&waiter->mutex);
167+
168+
int ret = poll(pfds, count, 2500);
169+
if (ret < 0) {
170+
perror("poll");
171+
return NULL;
172+
}
173+
174+
for (i = 0; i < count; i++) {
175+
if (pfds[i].revents & POLLIN) {
176+
fprintf(stderr, "poll in from %d\n", pfds[i].fd);
177+
ssize_t n = recv(pfds[i].fd, rx_buf, sizeof(rx_buf), 0);
178+
if (n <= 0) {
179+
if (n < 0)
180+
perror("recv");
181+
else
182+
printf("Client disconnected\n");
183+
184+
close(pfds[i].fd);
185+
186+
pthread_mutex_lock(&waiter->mutex);
187+
188+
pthread_mutex_unlock(&waiter->mutex);
189+
break;
190+
}
191+
192+
// if (recv(pfds[i].fd, rx_buf, RX_BUFFER_SIZE, 0) < 0) {
193+
// perror("recv");
194+
// }
195+
}
196+
}
197+
198+
free(pfds);
199+
}
101200

102201
return NULL;
103202
}
@@ -125,9 +224,26 @@ static struct waiter *waiter_get_or_new(struct ocre_container *container)
125224

126225
waiter->container = container;
127226

128-
int rc = pthread_create(&waiter->thread, NULL, container_wait_thread, waiter);
227+
int rc = pthread_mutex_init(&waiter->mutex, NULL);
129228
if (rc) {
130-
fprintf(stderr, "Failed to create thread: rc=%d", rc);
229+
fprintf(stderr, "Failed to initialize mutex: rc=%d", rc);
230+
free(waiter);
231+
return NULL;
232+
}
233+
234+
pthread_mutex_lock(&waiter->mutex);
235+
236+
fprintf(stderr, "Creating socket thread for container %p\n", container);
237+
rc = pthread_create(&waiter->socket_thread, NULL, socket_thread, waiter);
238+
if (rc) {
239+
fprintf(stderr, "Failed to create socket thread: rc=%d", rc);
240+
free(waiter);
241+
return NULL;
242+
}
243+
244+
rc = pthread_create(&waiter->wait_thread, NULL, wait_thread, waiter);
245+
if (rc) {
246+
fprintf(stderr, "Failed to create waiter thread: rc=%d", rc);
131247
free(waiter);
132248
return NULL;
133249
}
@@ -153,8 +269,12 @@ static int waiter_add_client(struct waiter *waiter, int socket)
153269

154270
client->socket = socket;
155271

272+
pthread_mutex_lock(&waiter->mutex);
273+
156274
LL_APPEND(waiter->clients, client);
157275

276+
pthread_mutex_unlock(&waiter->mutex);
277+
158278
return 0;
159279
}
160280

0 commit comments

Comments
 (0)