Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ sqFileThisSession() {
}

void
signalOnDataArrival(int fd, void *clientData, int flag){
signalOnDataArrival(sqInt fd, void *clientData, int flag){
interpreterProxy->signalSemaphoreWithIndex((sqInt)clientData);
aioDisable(fd);
}
Expand Down
106 changes: 72 additions & 34 deletions extracted/plugins/SocketPlugin/src/common/SocketPluginImpl.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ typedef struct privateSocketStruct
int multiListen; /* whether to listen for multiple connections */
int acceptedSock; /* a connection that has been accepted */
int socketType;
int waitingToSend;
} privateSocketStruct;

#define CONN_NOTIFY (1<<0)
Expand Down Expand Up @@ -261,7 +262,7 @@ typedef struct privateSocketStruct
#define SOCKETERROR(S) (PSP(S)->sockError)
#define SOCKETPEER(S) (PSP(S)->peer)
#define SOCKETPEERSIZE(S) (PSP(S)->peerSize)

#define SOCKET_WATINGTOSEND(S) (PSP(S)->waitingToSend)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] SOCKET_WATINGTOSEND -> SOCKET_WAITINGTOSEND


/*** Resolver state ***/

Expand Down Expand Up @@ -304,6 +305,7 @@ char *socketHandlerName(aioHandler h)
if (h == connectHandler) return "connectHandler";
if (h == dataHandler) return "dataHandler";
if (h == closeHandler) return "closeHandler";
if (h == sendHandler) return "sendHandler";
return "***unknownHandler***";
}
#endif
Expand Down Expand Up @@ -475,7 +477,8 @@ static void acceptHandler(sqInt fd, void *data, int flags)
pss->sockError= socketError(fd);
pss->sockState= Invalid;
pss->s= -1;
closesocket(fd);
pss->waitingToSend = false;
closesocket(fd);
logTrace("acceptHandler: aborting server %d pss=%p\n", fd, pss);
}
else /* (flags & AIO_R) -- accept() is ready */
Expand Down Expand Up @@ -517,7 +520,8 @@ static void acceptHandler(sqInt fd, void *data, int flags)
aioDisable(fd);
closesocket(fd);
pss->s= newSock;
aioEnable(newSock, pss, 0);
pss->waitingToSend = false;
aioEnable(newSock, pss, 0);
}
}
}
Expand Down Expand Up @@ -574,7 +578,25 @@ static void connectHandler(sqInt fd, void *data, int flags)
}


/* read or write data transfer is now possible for the socket. */
/* send data transfer is now possible for the socket. */

static void sendHandler(sqInt fd, void *data, int flags)
{
privateSocketStruct *pss= (privateSocketStruct *)data;
logTrace("sendHandler(%d=%d, %p, %d)\n", fd, pss->s, data, flags);

if (pss == NULL)
{
logTrace("sendHandler: pss is NULL fd=%d data=%p flags=0x%x\n", fd, data, flags);
return;
}

pss->waitingToSend = false;

notify(pss, WRITE_NOTIFY);
}

/* read data transfer is now possible for the socket. */

static void dataHandler(sqInt fd, void *data, int flags)
{
Expand Down Expand Up @@ -614,8 +636,8 @@ static void dataHandler(sqInt fd, void *data, int flags)
int n= recv(fd, (void *)buf, 1, MSG_OOB);
if (n == 1) logTrace("socket: received OOB data: %02x\n", buf[0]);
}
if (flags & AIO_R) notify(pss, READ_NOTIFY);
if (flags & AIO_W) notify(pss, WRITE_NOTIFY);

if (flags & AIO_R) notify(pss, READ_NOTIFY);
}


Expand All @@ -634,6 +656,8 @@ static void closeHandler(sqInt fd, void *data, int flags)
}
pss->sockState= Unconnected;
pss->s= -1;
pss->waitingToSend = false;

notify(pss, READ_NOTIFY | CONN_NOTIFY);
}

Expand Down Expand Up @@ -730,6 +754,7 @@ void sqSocketCreateNetTypeSocketTypeRecvBytesSendBytesSemaIDReadSemaIDWriteSemaI
return;
}
pss->s= newSocket;
pss->waitingToSend = false;
pss->connSema= semaIndex;
pss->readSema= readSemaIndex;
pss->writeSema= writeSemaIndex;
Expand Down Expand Up @@ -785,6 +810,7 @@ void sqSocketCreateRawProtoTypeRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID(Soc
return;
}
pss->s= newSocket;
pss->waitingToSend = false;
pss->connSema= semaIndex;
pss->readSema= readSemaIndex;
pss->writeSema= writeSemaIndex;
Expand Down Expand Up @@ -991,6 +1017,7 @@ void sqSocketAcceptFromRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID(SocketPtr s

_PSP(s)= pss;
pss->s= PSP(serverSocket)->acceptedSock;
pss->waitingToSend = false;
PSP(serverSocket)->acceptedSock= -1;
SOCKETSTATE(serverSocket)= WaitingForConnection;
aioHandle(SOCKET(serverSocket), acceptHandler, AIO_RX);
Expand Down Expand Up @@ -1224,9 +1251,8 @@ sqInt sqSocketSendDone(SocketPtr s)
if (!socketValid(s))
return false;

// If the socket is connected we just return true. Then the send/sendto might block, but we will use the event system
if(SOCKETSTATE(s) == Connected)
return true;
return !SOCKET_WATINGTOSEND(s);

return false;
}
Expand Down Expand Up @@ -1319,30 +1345,34 @@ sqInt sqSocketSendDataBufCount(SocketPtr s, char *buf, sqInt bufSize)
{
/* --- TCP --- */
logTrace( "TCP sendData(%d, %ld)\n", SOCKET(s), bufSize);
if ((nsent= send(SOCKET(s), buf, bufSize, 0)) <= 0)
{
lastError = getLastSocketError();
if ((nsent == -1) && (lastError == ERROR_WOULD_BLOCK))
{
logTrace( "TCP sendData(%d, %ld) -> %d [blocked]",
SOCKET(s), bufSize, nsent);
return 0;
}
else
{
/* error: most likely "connection closed by peer" */
SOCKETSTATE(s)= OtherEndClosed;
SOCKETERROR(s)= lastError;
logWarn("errno %d\n", lastError);
logWarnFromErrno("write");
if ((nsent= send(SOCKET(s), buf, bufSize, 0)) <= 0){
lastError = getLastSocketError();
if ((nsent == -1) && (lastError == ERROR_WOULD_BLOCK))
{
logTrace( "TCP sendData(%d, %ld) -> %d [blocked]", SOCKET(s), bufSize, nsent);
SOCKET_WATINGTOSEND(s) = true;
aioHandle(SOCKET(s), sendHandler, AIO_WX);
return 0;
}
else
{
/* error: most likely "connection closed by peer" */
SOCKETSTATE(s)= OtherEndClosed;
SOCKETERROR(s)= lastError;
SOCKET_WATINGTOSEND(s) = false;

return 0;
}
}
logWarn("errno %d\n", lastError);
logWarnFromErrno("write");
SOCKET_WATINGTOSEND(s) = false;

return 0;
}
}
}
/* write completed synchronously */
logTrace( "sendData(%d) done = %d\n", SOCKET(s), nsent);
return nsent;
/* write completed synchronously */
logTrace( "sendData(%d) done = %d\n", SOCKET(s), nsent);
SOCKET_WATINGTOSEND(s) = false;
return nsent;
}


Expand Down Expand Up @@ -1395,13 +1425,21 @@ sqInt sqSockettoHostportSendDataBufCount(SocketPtr s, sqInt address, sqInt port,
saddr.sin_addr.s_addr= htonl(address);
{
int nsent= sendto(SOCKET(s), buf, bufSize, 0, (struct sockaddr *)&saddr, sizeof(saddr));
if (nsent >= 0)
return nsent;
if (nsent >= 0){
SOCKET_WATINGTOSEND(s) = false;
return nsent;
}

int lastError = getLastSocketError();

if (lastError == ERROR_WOULD_BLOCK) /* asynchronous write in progress */
return 0;
if (lastError == ERROR_WOULD_BLOCK) {
SOCKET_WATINGTOSEND(s) = true;
aioHandle(SOCKET(s), sendHandler, AIO_WX);

/* asynchronous write in progress */
return 0;
}

logTrace( "UDP send failed\n");
SOCKETERROR(s)= lastError;
}
Expand Down