Skip to content

Commit 3336670

Browse files
committed
fix(ipc): handle multiple waiters
Signed-off-by: Marco Casaroli <marco.casaroli@gmail.com>
1 parent d4bef4a commit 3336670

3 files changed

Lines changed: 136 additions & 19 deletions

File tree

src/samples/supervisor/posix/client/client.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ struct ocre_container *ocre_context_create_container(struct ocre_context *contex
117117
uint8_t payload[LARGE_PAYLOAD_SIZE];
118118
bool success;
119119

120+
if (!context) {
121+
return NULL;
122+
}
123+
120124
/* Encode the request */
121125
ZCBOR_STATE_E(encoding_state, 0, payload, sizeof(payload), 0);
122126

src/samples/supervisor/posix/server/handlers.c

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,6 @@
3636
#define MAX_CONTAINERS 64
3737
#define MAX_STRING_ARRAY 32
3838

39-
/* Global ocre context for the server */
40-
// static struct ocre_context *ctx = NULL;
41-
42-
// static void print_hex(const char *label, const uint8_t *data, size_t len)
43-
// {
44-
// printf("HEX %s: ", label);
45-
// for (size_t i = 0; i < len; i++) {
46-
// printf("%02x ", data[i]);
47-
// }
48-
// printf("\n");
49-
// }
50-
5139
static int handle_context_create_container(struct ocre_context *ctx, zcbor_state_t *dec_state, uint8_t *tx_buf,
5240
size_t tx_buf_size)
5341
{

src/samples/supervisor/posix/server/waiters.c

Lines changed: 132 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
* SPDX-License-Identifier: Apache-2.0
66
*/
77

8+
#include <pthread.h>
89
#include <stdlib.h>
910
#include <string.h>
1011
#include <unistd.h>
1112
#include <stdio.h>
1213
#include <stdint.h>
14+
#include <poll.h>
1315

1416
#include <sys/socket.h>
1517

@@ -28,15 +30,22 @@ struct client {
2830
};
2931

3032
struct waiter {
33+
pthread_mutex_t mutex;
34+
bool finished;
35+
int exit_status;
36+
int result;
3137
struct ocre_container *container;
32-
pthread_t thread;
38+
pthread_t wait_thread;
39+
pthread_t socket_thread;
3340
struct client *clients;
3441
struct waiter *next;
3542
};
3643

3744
static struct waiter *waiters = NULL;
3845

39-
static void *container_wait_thread(void *arg)
46+
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
47+
48+
static void *wait_thread(void *arg)
4049
{
4150
uint8_t tx_buf[TX_BUFFER_SIZE];
4251
struct waiter *waiter = (struct waiter *)arg;
@@ -82,6 +91,8 @@ static void *container_wait_thread(void *arg)
8291
struct client *client = NULL;
8392
struct client *elt = NULL;
8493

94+
pthread_mutex_lock(&waiter->mutex);
95+
8596
LL_FOREACH_SAFE(waiter->clients, client, elt)
8697
{
8798
if (send(client->socket, tx_buf, response_len, 0) < 0) {
@@ -97,7 +108,85 @@ static void *container_wait_thread(void *arg)
97108
LL_DELETE(waiter->clients, client);
98109
}
99110

100-
LL_DELETE(waiters, waiter);
111+
waiter->finished = true;
112+
113+
pthread_mutex_unlock(&waiter->mutex);
114+
115+
/* Wait for the socket thread to finish */
116+
pthread_join(waiter->socket_thread, NULL);
117+
118+
return NULL;
119+
}
120+
121+
static void *socket_thread(void *arg)
122+
{
123+
struct client *client = NULL;
124+
struct waiter *waiter = (struct waiter *)arg;
125+
126+
while (true) {
127+
int count = 0;
128+
pthread_mutex_lock(&waiter->mutex);
129+
130+
if (waiter->finished) {
131+
pthread_mutex_unlock(&waiter->mutex);
132+
return NULL;
133+
};
134+
135+
LL_COUNT(waiter->clients, client, count);
136+
137+
fprintf(stderr, "Number of clients: %d.\n", count);
138+
139+
uint8_t rx_buf[RX_BUFFER_SIZE];
140+
141+
struct pollfd *pfds = malloc(sizeof(struct pollfd) * count);
142+
if (!pfds) {
143+
perror("malloc");
144+
pthread_mutex_unlock(&waiter->mutex);
145+
return NULL;
146+
}
147+
148+
int i = 0;
149+
LL_FOREACH(waiter->clients, client)
150+
{
151+
pfds[i].fd = client->socket;
152+
pfds[i].events = POLLIN;
153+
i++;
154+
}
155+
156+
pthread_mutex_unlock(&waiter->mutex);
157+
158+
int ret = poll(pfds, count, 2500);
159+
if (ret < 0) {
160+
perror("poll");
161+
return NULL;
162+
}
163+
164+
for (i = 0; i < count; i++) {
165+
if (pfds[i].revents & POLLIN) {
166+
fprintf(stderr, "poll in from %d\n", pfds[i].fd);
167+
ssize_t n = recv(pfds[i].fd, rx_buf, sizeof(rx_buf), 0);
168+
if (n <= 0) {
169+
if (n < 0) {
170+
perror("recv");
171+
} else {
172+
printf("Client disconnected\n");
173+
}
174+
175+
close(pfds[i].fd);
176+
177+
pthread_mutex_lock(&waiter->mutex);
178+
179+
// TODO: remove from client list
180+
181+
pthread_mutex_unlock(&waiter->mutex);
182+
183+
break;
184+
}
185+
}
186+
}
187+
188+
free(pfds);
189+
}
101190

102191
return NULL;
103192
}
@@ -125,11 +214,35 @@ static struct waiter *waiter_get_or_new(struct ocre_container *container)
125214

126215
waiter->container = container;
127216

128-
int rc = pthread_create(&waiter->thread, NULL, container_wait_thread, waiter);
217+
int rc = pthread_mutex_init(&waiter->mutex, NULL);
129218
if (rc) {
130-
fprintf(stderr, "Failed to create thread: rc=%d", rc);
131-
free(waiter);
132-
return NULL;
219+
fprintf(stderr, "Failed to initialize mutex: rc=%d", rc);
220+
goto error_waiter;
221+
}
222+
223+
rc = pthread_attr_init(&attr);
224+
if (rc) {
225+
fprintf(stderr, "Failed to initialize thread attributes: rc=%d", rc);
226+
goto error_mutex;
227+
}
228+
229+
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
230+
if (rc) {
231+
fprintf(stderr, "Failed to initialize thread attributes: rc=%d", rc);
232+
goto error_attr;
233+
}
234+
235+
fprintf(stderr, "Creating socket thread for container %p\n", container);
236+
rc = pthread_create(&waiter->socket_thread, NULL, socket_thread, waiter);
237+
if (rc) {
238+
fprintf(stderr, "Failed to create socket thread: rc=%d", rc);
239+
goto error_mutex;
240+
}
241+
242+
rc = pthread_create(&waiter->wait_thread, &attr, wait_thread, waiter);
243+
if (rc) {
244+
fprintf(stderr, "Failed to create waiter thread: rc=%d", rc);
245+
goto error_thread;
133246
}
134247

135248
pthread_mutex_unlock(&waiter->mutex);
@@ -153,8 +266,20 @@ static int waiter_add_client(struct waiter *waiter, int socket)
153266

154267
client->socket = socket;
155268

269+
rc = pthread_mutex_lock(&waiter->mutex);
270+
if (rc) {
271+
fprintf(stderr, "Failed to lock mutex: rc=%d", rc);
272+
goto error_client;
273+
}
274+
156275
LL_APPEND(waiter->clients, client);
157276

277+
rc = pthread_mutex_unlock(&waiter->mutex);
278+
if (rc) {
279+
fprintf(stderr, "Failed to unlock mutex: rc=%d", rc);
280+
goto error_client;
281+
}
282+
158283
return 0;
159284
}
160285

0 commit comments

Comments
 (0)