1313import com .github .dockerjava .api .exception .NotModifiedException ;
1414import com .github .dockerjava .api .exception .UnauthorizedException ;
1515import com .github .dockerjava .api .model .Frame ;
16+ import com .github .dockerjava .api .model .StreamType ;
1617import com .github .dockerjava .core .InvocationBuilder ;
17- import com .github .dockerjava .netty .handler .FramedResponseStreamHandler ;
18- import com .github .dockerjava .netty .handler .JsonResponseCallbackHandler ;
19- import io .netty .buffer .ByteBuf ;
20- import io .netty .buffer .Unpooled ;
21- import io .netty .channel .SimpleChannelInboundHandler ;
2218import lombok .AccessLevel ;
19+ import lombok .RequiredArgsConstructor ;
2320import lombok .SneakyThrows ;
2421import lombok .experimental .FieldDefaults ;
2522import lombok .extern .slf4j .Slf4j ;
3431import okio .BufferedSource ;
3532import okio .Okio ;
3633import okio .Source ;
34+ import org .testcontainers .DockerClientFactory ;
3735
3836import javax .annotation .Nullable ;
3937import java .io .ByteArrayInputStream ;
4038import java .io .IOException ;
4139import java .io .InputStream ;
4240import java .lang .reflect .Field ;
43- import java .util .concurrent .atomic .AtomicBoolean ;
41+ import java .util .Objects ;
42+ import java .util .function .Consumer ;
4443
4544@ Slf4j
4645@ FieldDefaults (level = AccessLevel .PRIVATE , makeFinal = true )
@@ -91,7 +90,7 @@ public void get(ResultCallback<Frame> resultCallback) {
9190 executeAndStream (
9291 request ,
9392 resultCallback ,
94- new FramedResponseStreamHandler (resultCallback )
93+ new FramedSink (resultCallback )
9594 );
9695 }
9796
@@ -187,7 +186,7 @@ public void run() {
187186 okHttpClient ,
188187 request ,
189188 resultCallback ,
190- new FramedResponseStreamHandler (resultCallback )
189+ new FramedSink (resultCallback )
191190 );
192191 }
193192
@@ -200,7 +199,7 @@ public <T> void post(TypeReference<T> typeReference, ResultCallback<T> resultCal
200199 executeAndStream (
201200 request ,
202201 resultCallback ,
203- new JsonResponseCallbackHandler <>(typeReference , resultCallback )
202+ new JsonSink <>(objectMapper , typeReference , resultCallback )
204203 );
205204 }
206205
@@ -247,7 +246,7 @@ public MediaType contentType() {
247246
248247 @ Override
249248 public void writeTo (BufferedSink sink ) throws IOException {
250- try (Source source = Okio .source (body )) {
249+ try (Source source = Okio .source (body )) {
251250 sink .writeAll (source );
252251 }
253252 }
@@ -286,49 +285,98 @@ protected Response execute(OkHttpClient okHttpClient, Request request) {
286285 }
287286 }
288287
289- protected <T > void executeAndStream (Request request , ResultCallback <T > callback , SimpleChannelInboundHandler < ByteBuf > handler ) {
290- executeAndStream (okHttpClient , request , callback , handler );
288+ protected <T > void executeAndStream (Request request , ResultCallback <T > callback , Consumer < BufferedSource > sourceConsumer ) {
289+ executeAndStream (okHttpClient , request , callback , sourceConsumer );
291290 }
292291
293- protected <T > void executeAndStream (OkHttpClient okHttpClient , Request request , ResultCallback <T > callback , SimpleChannelInboundHandler <ByteBuf > handler ) {
294- // TODO proper thread management
295- Thread thread = new Thread () {
296- @ Override
297- @ SneakyThrows
298- public void run () {
299- try (
300- Response response = execute (okHttpClient , request .newBuilder ().tag ("streaming" ).build ());
301- BufferedSource source = response .body ().source ();
302- InputStream inputStream = source .inputStream ();
303- ) {
304- AtomicBoolean shouldStop = new AtomicBoolean ();
305- callback .onStart (() -> {
306- shouldStop .set (true );
307- response .close ();
308- });
309-
310- byte [] buffer = new byte [4 * 1024 ];
311- while (!(shouldStop .get () || source .exhausted ())) {
312- int bytesReceived = inputStream .read (buffer );
313-
314- int offset = 0 ;
315- for (int i = 0 ; i < bytesReceived ; i ++) {
316- // some handlers like JsonResponseCallbackHandler do not work with multi-line buffers
317- boolean isLineBreak = buffer [i ] == '\n' ;
318- boolean isEndOfBuffer = i == bytesReceived - 1 ;
319- if (isLineBreak || isEndOfBuffer ) {
320- handler .channelRead (null , Unpooled .wrappedBuffer (buffer , offset , i - offset + 1 ));
321- offset = i + 1 ;
322- }
323- }
292+ protected <T > void executeAndStream (OkHttpClient okHttpClient , Request request , ResultCallback <T > callback , Consumer <BufferedSource > sourceConsumer ) {
293+ Thread thread = new Thread (DockerClientFactory .TESTCONTAINERS_THREAD_GROUP , () -> {
294+ try (
295+ Response response = execute (okHttpClient , request .newBuilder ().tag ("streaming" ).build ());
296+ BufferedSource source = response .body ().source ();
297+ ) {
298+ callback .onStart (response );
299+ sourceConsumer .accept (source );
300+ callback .onComplete ();
301+ } catch (Exception e ) {
302+ callback .onError (e );
303+ }
304+ }, "tc-okhttp-stream-" + Objects .hashCode (request ));
305+ thread .setDaemon (true );
306+
307+ thread .start ();
308+ }
309+
310+ @ RequiredArgsConstructor
311+ @ FieldDefaults (makeFinal = true , level = AccessLevel .PRIVATE )
312+ private static class JsonSink <T > implements Consumer <BufferedSource > {
313+
314+ ObjectMapper objectMapper ;
315+
316+ TypeReference <T > typeReference ;
317+
318+ ResultCallback <T > resultCallback ;
319+
320+ @ Override
321+ public void accept (BufferedSource source ) {
322+ try {
323+ while (true ) {
324+ String line = source .readUtf8Line ();
325+ if (line == null ) {
326+ break ;
324327 }
325- callback .onComplete ();
326- } catch (Exception e ) {
327- callback .onError (e );
328+
329+ resultCallback .onNext (objectMapper .readValue (line , typeReference ));
328330 }
331+ } catch (Exception e ) {
332+ resultCallback .onError (e );
329333 }
330- };
334+ }
335+ }
331336
332- thread .start ();
337+ @ RequiredArgsConstructor
338+ @ FieldDefaults (makeFinal = true , level = AccessLevel .PRIVATE )
339+ private static class FramedSink implements Consumer <BufferedSource > {
340+
341+ private static final int HEADER_SIZE = 8 ;
342+
343+ ResultCallback <Frame > resultCallback ;
344+
345+ @ Override
346+ public void accept (BufferedSource source ) {
347+ try {
348+ while (!source .exhausted ()) {
349+ // See https://docs.docker.com/engine/api/v1.37/#operation/ContainerAttach
350+ if (!source .request (HEADER_SIZE )) {
351+ return ;
352+ }
353+ StreamType streamType = streamType (source .readByte ());
354+ source .skip (3 );
355+ int payloadSize = source .readInt ();
356+
357+ if (!source .request (payloadSize )) {
358+ return ;
359+ }
360+ byte [] payload = source .readByteArray (payloadSize );
361+
362+ resultCallback .onNext (new Frame (streamType , payload ));
363+ }
364+ } catch (Exception e ) {
365+ resultCallback .onError (e );
366+ }
367+ }
368+
369+ private static StreamType streamType (byte streamType ) {
370+ switch (streamType ) {
371+ case 0 :
372+ return StreamType .STDIN ;
373+ case 1 :
374+ return StreamType .STDOUT ;
375+ case 2 :
376+ return StreamType .STDERR ;
377+ default :
378+ return StreamType .RAW ;
379+ }
380+ }
333381 }
334382}
0 commit comments