Skip to content

Commit afc3a7d

Browse files
committed
Add protocol for signalling consumption from consumer to producer
IPC message can optionally transfer buffers from the producer to the consumer. These buffers are not copied while the message is in transit. Consequently, the consumer must inform the producer when it has finished processing the buffer. This patch adds a monitor variable that the producer uses to wait for the competion of the consumers message handling. Once done, the consumer informs the producer to continue. The reply can also return a buffer back to the consumer. The lifetime of this buffer is managed by a higher-level protocol.
1 parent aec636c commit afc3a7d

File tree

3 files changed

+193
-24
lines changed

3 files changed

+193
-24
lines changed

src/IPCQueue.c

Lines changed: 147 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,150 @@
44

55
#include "IPCQueue.h"
66

7+
typedef struct
8+
{
9+
uint32_t mDWord0;
10+
uint32_t mDWord1;
11+
uint32_t mStatus;
12+
void* mBuffer;
13+
} IPCMessageReply;
14+
715
/*
816
* IPCMessage
917
*/
1018

1119
int
1220
IPCMessageInit(IPCMessage* aMsg)
1321
{
22+
aMsg->mMonitor = xQueueCreate(1, sizeof(IPCMessageReply));
23+
if (!aMsg->mMonitor) {
24+
return -1;
25+
}
26+
1427
aMsg->mDWord0 = 0;
1528
aMsg->mDWord1 = 0;
16-
aMsg->mStatus = 0;
29+
aMsg->mStatus = IPC_MESSAGE_STATE_CLEAR;
1730
aMsg->mBuffer = NULL;
1831

1932
return 0;
2033
}
2134

35+
void
36+
IPCMessageUninit(IPCMessage* aMsg)
37+
{
38+
vQueueDelete(aMsg->mMonitor);
39+
}
40+
2241
uint32_t
2342
IPCMessageGetBufferLength(const IPCMessage* aMsg)
2443
{
2544
return aMsg->mStatus & 0x00ffffff;
2645
}
2746

2847
int
29-
IPCMessageProduce(IPCMessage* aMsg)
48+
IPCMessageProduce(IPCMessage* aMsg, uint32_t aLength, void* aBuffer)
49+
{
50+
switch (aMsg->mStatus & 0xf0000000) {
51+
case IPC_MESSAGE_STATE_CLEAR: /* fall through */
52+
case IPC_MESSAGE_STATE_PRODUCED: /* fall through */
53+
case IPC_MESSAGE_STATE_ERROR:
54+
/* We're good if the message is currently not in transit. */
55+
break;
56+
case IPC_MESSAGE_STATE_PENDING: /* fall through */
57+
default:
58+
/* If the message is currently in transit or the status is
59+
* unknown, we don't produce a new one. Better abort here. */
60+
return -1;
61+
}
62+
63+
aMsg->mDWord0 = 0;
64+
aMsg->mDWord1 = 0;
65+
aMsg->mStatus = 0;
66+
aMsg->mStatus |= IPC_MESSAGE_STATE_PRODUCED;
67+
aMsg->mStatus |= aLength;
68+
aMsg->mBuffer = aBuffer;
69+
70+
return 0;
71+
}
72+
73+
static int
74+
WaitForConsumption(IPCMessage* aMsg, IPCMessageReply* aReply)
75+
{
76+
uint32_t state = aMsg->mStatus & 0xf0000000;
77+
if (state != IPC_MESSAGE_STATE_PENDING) {
78+
return -1;
79+
}
80+
BaseType_t ret = xQueueReceive(aMsg->mMonitor, aReply, portMAX_DELAY);
81+
if (ret != pdPASS) {
82+
return -1;
83+
};
84+
return 0;
85+
}
86+
87+
int
88+
IPCMessageWaitForReply(IPCMessage* aMsg)
3089
{
31-
/* TODO: At some point we have to implement efficient IPC with
32-
* large buffers. IPCMessageProduce() will signal the end of the
33-
* message constrcution **on the producer tast.** A produced
34-
* message can be send over over an IPC queue to a consumer task.
35-
* The consumer calls IPCMessageConsume() after it processed the
36-
* buffer. The producer can then release the buffer. */
90+
IPCMessageReply reply;
91+
int res = WaitForConsumption(aMsg, &reply);
92+
if (res < 0) {
93+
return -1;
94+
};
95+
aMsg->mDWord0 = reply.mDWord0;
96+
aMsg->mDWord1 = reply.mDWord1;
97+
aMsg->mStatus = reply.mStatus;
98+
aMsg->mBuffer = reply.mBuffer;
3799
return 0;
38100
}
39101

40102
int
41-
IPCMessageConsume(IPCMessage* aMsg)
103+
IPCMessageWaitForConsumption(IPCMessage* aMsg)
42104
{
43-
/* TODO: See IPCMessageProduce() */
105+
IPCMessageReply reply;
106+
int res = WaitForConsumption(aMsg, &reply);
107+
if (res < 0) {
108+
return -1;
109+
};
110+
aMsg->mStatus &= 0x0fffffff; /* clear pending status */
111+
return 0;
112+
}
113+
114+
static int
115+
ConsumeAndReply(IPCMessage* aMsg, const IPCMessageReply* aReply)
116+
{
117+
BaseType_t res = xQueueSend(aMsg->mMonitor, &aReply, 0);
118+
if (res != pdPASS) {
119+
return -1;
120+
}
44121
return 0;
45122
}
46123

124+
int
125+
IPCMessageConsumeAndReply(IPCMessage* aMsg,
126+
uint32_t aDWord0, uint32_t aDWord1,
127+
uint32_t aFlags, uint32_t aLength,
128+
void* aBuffer)
129+
{
130+
IPCMessageReply reply = {
131+
.mDWord0 = aDWord0,
132+
.mDWord1 = aDWord1,
133+
.mStatus = aFlags | aLength,
134+
.mBuffer = aBuffer
135+
};
136+
return ConsumeAndReply(aMsg, &reply);
137+
}
138+
139+
int
140+
IPCMessageConsume(IPCMessage* aMsg)
141+
{
142+
static const IPCMessageReply sReply = {
143+
.mDWord0 = 0,
144+
.mDWord1 = 0,
145+
.mStatus = IPC_MESSAGE_STATE_CLEAR,
146+
.mBuffer = NULL,
147+
};
148+
return ConsumeAndReply(aMsg, &sReply);
149+
}
150+
47151
/*
48152
* IPCMessageQueue
49153
*/
@@ -58,14 +162,42 @@ IPCMessageQueueInit(IPCMessageQueue* aMsgQueue)
58162
return 0;
59163
}
60164

165+
void
166+
IPCMessageQueueUninit(IPCMessageQueue* aMsgQueue)
167+
{
168+
vQueueDelete(aMsgQueue->mWaitQueue);
169+
}
170+
61171
int
62172
IPCMessageQueueConsume(IPCMessageQueue* aMsgQueue, IPCMessage* aMsg)
63173
{
64-
BaseType_t res = xQueueSend(aMsgQueue->mWaitQueue, aMsg, 0);
65-
if (res != pdPASS){
66-
return -1;
67-
}
68-
return 0;
174+
uint32_t status = aMsg->mStatus;
175+
176+
switch (status & 0xf0000000) {
177+
case IPC_MESSAGE_STATE_PRODUCED: /* fall through */
178+
/* We're good if the message has been produced correctly. */
179+
break;
180+
case IPC_MESSAGE_STATE_CLEAR: /* fall through */
181+
case IPC_MESSAGE_STATE_PENDING: /* fall through */
182+
case IPC_MESSAGE_STATE_ERROR: /* fall through */
183+
default:
184+
/* In any other case, the message is probably not ready for
185+
* consumption. Better abort here. */
186+
return -1;
187+
}
188+
189+
aMsg->mStatus &= 0x0fffffff;
190+
aMsg->mStatus |= IPC_MESSAGE_STATE_PENDING;
191+
192+
BaseType_t res = xQueueSend(aMsgQueue->mWaitQueue, aMsg, 0);
193+
if (res != pdPASS){
194+
goto err_xQueueSend;
195+
}
196+
return 0;
197+
198+
err_xQueueSend:
199+
aMsg->mStatus = status;
200+
return -1;
69201
}
70202

71203
int

src/IPCQueue.h

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,50 @@
1313
* IPCMessage
1414
*/
1515

16+
enum IPCMessageStatus {
17+
IPC_MESSAGE_STATE_CLEAR = 0x00000000,
18+
IPC_MESSAGE_STATE_PRODUCED = 0x10000000,
19+
IPC_MESSAGE_STATE_PENDING = 0x20000000,
20+
IPC_MESSAGE_STATE_ERROR = 0x30000000
21+
};
22+
1623
typedef struct
1724
{
25+
/* Internal monitor for signalling */
26+
QueueHandle_t mMonitor;
27+
1828
/* Two dword for data transfers. */
1929
uint32_t mDWord0;
2030
uint32_t mDWord1;
21-
/* Message flags [31:24] and buffer length [23:0] */
31+
/* Message state [31:28], flags [27:24], and buffer length [23:0] */
2232
uint32_t mStatus;
2333
/* Message buffer */
24-
const void* mBuffer;
34+
void* mBuffer;
2535
} IPCMessage;
2636

2737
int
2838
IPCMessageInit(IPCMessage* aMsg);
2939

40+
void
41+
IPCMessageUninit(IPCMessage* aMsg);
42+
3043
uint32_t
3144
IPCMessageGetBufferLength(const IPCMessage* aMsg);
3245

3346
int
34-
IPCMessageProduce(IPCMessage* aMsg);
47+
IPCMessageProduce(IPCMessage* aMsg, uint32_t aLength, void* aBuffer);
48+
49+
int
50+
IPCMessageWaitForReply(IPCMessage* aMsg);
51+
52+
int
53+
IPCMessageWaitForConsumption(IPCMessage* aMsg);
54+
55+
int
56+
IPCMessageConsumeAndReply(IPCMessage* aMsg,
57+
uint32_t aDWord0, uint32_t aDWord1,
58+
uint32_t aFlags, uint32_t aLength,
59+
void* aBuffer);
3560

3661
int
3762
IPCMessageConsume(IPCMessage* aMsg);

src/Producer.c

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,34 @@ Run(ProducerTask* aProducer)
1919
"This"," is"," a"," SensorWeb"," device"," by"," Mozilla.\n\r"
2020
};
2121

22+
IPCMessage msg;
23+
int res = IPCMessageInit(&msg);
24+
if (res < 0) {
25+
return;
26+
}
27+
2228
for (unsigned long i = 0;; i = (i + 1) % ArrayLength(sMessage)) {
23-
IPCMessage msg;
24-
int res = IPCMessageInit(&msg);
29+
30+
res = IPCMessageProduce(&msg, strlen(sMessage[i]) + 1, (void*)sMessage[i]);
2531
if (res < 0) {
2632
return;
2733
}
28-
msg.mBuffer = sMessage[i];
29-
msg.mStatus = strlen(msg.mBuffer) + 1;
30-
31-
IPCMessageProduce(&msg);
3234

3335
res = IPCMessageQueueConsume(aProducer->mSendQueue, &msg);
3436
if (res < 0) {
3537
return;
3638
}
39+
3740
vTaskDelay(TicksOfMSecs(200));
41+
42+
/* While we have been waiting in vTaskDelay(), the consumer
43+
* probably processed our message. Waiting for consumption should
44+
* have the reply ready.
45+
*/
46+
res = IPCMessageWaitForConsumption(&msg);
47+
if (res < 0) {
48+
return;
49+
}
3850
}
3951
}
4052

0 commit comments

Comments
 (0)