From d1c01e6323203eb629ca75da0b5b6c879dd3b906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Thu, 21 Aug 2025 11:38:57 +0200 Subject: [PATCH 1/4] Introduce `CheckedItera{ble,tor}` to avoid wrapping `IOE`s --- .../internal/net/http/CheckedIterable.java | 50 ++++++++++++++ .../internal/net/http/CheckedIterator.java | 68 +++++++++++++++++++ .../jdk/internal/net/http/PullPublisher.java | 37 +++++++--- .../internal/net/http/RequestPublishers.java | 55 +++++++-------- 4 files changed, 167 insertions(+), 43 deletions(-) create mode 100644 src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java create mode 100644 src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java new file mode 100644 index 0000000000000..f28b57b956591 --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.util.Iterator; + +/** + * An {@link Iterable} clone for {@link CheckedIterator}. + * + * @param the type of elements returned by the produced iterators + */ +@FunctionalInterface +interface CheckedIterable { + + /** + * {@return an {@linkplain CheckedIterator iterator} over elements of type {@code E}} + */ + CheckedIterator iterator(); + + static CheckedIterable fromIterable(Iterable iterable) { + return () -> { + Iterator iterator = iterable.iterator(); + return CheckedIterator.fromIterator(iterator); + }; + } + +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java new file mode 100644 index 0000000000000..fe10785abb75f --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * An {@link java.util.Iterator} clone supporting checked exceptions. + * + * @param the type of elements returned by this iterator + */ +interface CheckedIterator { + + /** + * {@return {@code true} if the iteration has more elements} + * @throws Exception if operation fails + */ + boolean hasNext() throws Exception; + + /** + * {@return the next element in the iteration} + * + * @throws NoSuchElementException if the iteration has no more elements + * @throws Exception if operation fails + */ + E next() throws Exception; + + static CheckedIterator fromIterator(Iterator iterator) { + return new CheckedIterator<>() { + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public E next() { + return iterator.next(); + } + + }; + } + +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java index 0556214648e41..4ac00d1f51e87 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java @@ -25,14 +25,14 @@ package jdk.internal.net.http; -import java.util.Iterator; import java.util.concurrent.Flow; + import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.SequentialScheduler; /** - * A Publisher that publishes items obtained from the given Iterable. Each new - * subscription gets a new Iterator. + * A {@linkplain Flow.Publisher publisher} that publishes items obtained from the given {@link CheckedIterator} supplier. + * Each new subscription gets a new {@code CheckedIterator}. */ class PullPublisher implements Flow.Publisher { @@ -40,15 +40,21 @@ class PullPublisher implements Flow.Publisher { // non-null when an error has been encountered, by the creator of // PullPublisher, while subscribing the subscriber, but before subscribe has // completed. - private final Iterable iterable; + private final CheckedIterable iterable; private final Throwable throwable; - PullPublisher(Iterable iterable, Throwable throwable) { + PullPublisher(CheckedIterable iterable, Throwable throwable) { + if ((iterable == null && throwable == null) || (iterable != null && throwable != null)) { + String message = String.format( + "only one of `iterable` or `throwable` can be null, but %s are", + throwable == null ? "both" : "none"); + throw new IllegalArgumentException(message); + } this.iterable = iterable; this.throwable = throwable; } - PullPublisher(Iterable iterable) { + PullPublisher(CheckedIterable iterable) { this(iterable, null); } @@ -56,10 +62,8 @@ class PullPublisher implements Flow.Publisher { public void subscribe(Flow.Subscriber subscriber) { Subscription sub; if (throwable != null) { - assert iterable == null : "non-null iterable: " + iterable; sub = new Subscription(subscriber, null, throwable); } else { - assert throwable == null : "non-null exception: " + throwable; sub = new Subscription(subscriber, iterable.iterator(), null); } subscriber.onSubscribe(sub); @@ -72,7 +76,7 @@ public void subscribe(Flow.Subscriber subscriber) { private class Subscription implements Flow.Subscription { private final Flow.Subscriber subscriber; - private final Iterator iter; + private final CheckedIterator iter; private volatile boolean completed; private volatile boolean cancelled; private volatile Throwable error; @@ -80,7 +84,7 @@ private class Subscription implements Flow.Subscription { private final Demand demand = new Demand(); Subscription(Flow.Subscriber subscriber, - Iterator iter, + CheckedIterator iter, Throwable throwable) { this.subscriber = subscriber; this.iter = iter; @@ -117,7 +121,18 @@ protected void run() { } subscriber.onNext(next); } - if (!iter.hasNext() && !cancelled) { + + boolean hasNext; + try { + hasNext = iter.hasNext(); + } catch (Exception e) { + completed = true; + pullScheduler.stop(); + subscriber.onError(e); + return; + } + + if (!hasNext && !cancelled) { completed = true; pullScheduler.stop(); subscriber.onComplete(); diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index dd5443c503567..f230a752117f1 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -38,7 +38,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -98,7 +97,7 @@ List copy(byte[] content, int offset, int length) { @Override public void subscribe(Flow.Subscriber subscriber) { List copy = copy(content, offset, length); - var delegate = new PullPublisher<>(copy); + var delegate = new PullPublisher<>(CheckedIterable.fromIterable(copy)); delegate.subscribe(subscriber); } @@ -110,26 +109,30 @@ public long contentLength() { // This implementation has lots of room for improvement. public static class IterablePublisher implements BodyPublisher { - private final Iterable content; + + private final CheckedIterable content; private volatile long contentLength; public IterablePublisher(Iterable content) { - this.content = Objects.requireNonNull(content); + Objects.requireNonNull(content, "content"); + this.content = CheckedIterable.fromIterable(content); } // The ByteBufferIterator will iterate over the byte[] arrays in // the content one at the time. // - class ByteBufferIterator implements Iterator { + class ByteBufferIterator implements CheckedIterator { + final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue<>(); - final Iterator iterator = content.iterator(); + final CheckedIterator iterator = content.iterator(); + @Override - public boolean hasNext() { + public boolean hasNext() throws Exception { return !buffers.isEmpty() || iterator.hasNext(); } @Override - public ByteBuffer next() { + public ByteBuffer next() throws Exception { ByteBuffer buffer = buffers.poll(); while (buffer == null) { copy(); @@ -142,7 +145,7 @@ ByteBuffer getBuffer() { return Utils.getBuffer(); } - void copy() { + void copy() throws Exception { byte[] bytes = iterator.next(); int length = bytes.length; if (length == 0 && iterator.hasNext()) { @@ -165,18 +168,18 @@ void copy() { } } - public Iterator iterator() { + CheckedIterator iterator() { return new ByteBufferIterator(); } @Override public void subscribe(Flow.Subscriber subscriber) { - Iterable iterable = this::iterator; + CheckedIterable iterable = this::iterator; var delegate = new PullPublisher<>(iterable); delegate.subscribe(subscriber); } - static long computeLength(Iterable bytes) { + static long computeLength(CheckedIterable bytes) { // Avoid iterating just for the purpose of computing // a length, in case iterating is a costly operation // For HTTP/1.1 it means we will be using chunk encoding @@ -207,7 +210,7 @@ public StringPublisher(String content, Charset charset) { public static class EmptyPublisher implements BodyPublisher { private final Flow.Publisher delegate = - new PullPublisher(Collections.emptyList(), null); + new PullPublisher<>(CheckedIterable.fromIterable(Collections.emptyList()), null); @Override public long contentLength() { @@ -289,7 +292,7 @@ public long contentLength() { /** * Reads one buffer ahead all the time, blocking in hasNext() */ - public static class StreamIterator implements Iterator { + public static class StreamIterator implements CheckedIterator { final InputStream is; final Supplier bufSupplier; private volatile boolean eof; @@ -330,20 +333,8 @@ private int read() throws IOException { return n; } - /** - * Close stream in this instance. - * UncheckedIOException may be thrown if IOE happens at InputStream::close. - */ - private void closeStream() { - try { - is.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { stateLock.lock(); try { return hasNext0(); @@ -352,7 +343,7 @@ public boolean hasNext() { } } - private boolean hasNext0() { + private boolean hasNext0() throws IOException { if (need2Read) { try { haveNext = read() != -1; @@ -362,10 +353,10 @@ private boolean hasNext0() { } catch (IOException e) { haveNext = false; need2Read = false; - throw new UncheckedIOException(e); + throw new IOException(e); } finally { if (!haveNext) { - closeStream(); + is.close(); } } } @@ -373,7 +364,7 @@ private boolean hasNext0() { } @Override - public ByteBuffer next() { + public ByteBuffer next() throws IOException { stateLock.lock(); try { if (!hasNext()) { @@ -408,7 +399,7 @@ public void subscribe(Flow.Subscriber subscriber) { publisher.subscribe(subscriber); } - protected Iterable iterableOf(InputStream is) { + CheckedIterable iterableOf(InputStream is) { return () -> new StreamIterator(is); } From 53f483bba96b68f4d5a0beec73d8a00aabb43b1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 22 Aug 2025 10:47:13 +0200 Subject: [PATCH 2/4] Simplify `iterable`-vs-`throwable` check --- .../share/classes/jdk/internal/net/http/PullPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java index 4ac00d1f51e87..f3b4035f8c7ea 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java @@ -44,7 +44,7 @@ class PullPublisher implements Flow.Publisher { private final Throwable throwable; PullPublisher(CheckedIterable iterable, Throwable throwable) { - if ((iterable == null && throwable == null) || (iterable != null && throwable != null)) { + if ((iterable == null) == (throwable == null)) { String message = String.format( "only one of `iterable` or `throwable` can be null, but %s are", throwable == null ? "both" : "none"); From a68707c6105e29361bbf5de929e4577738371c15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 22 Aug 2025 10:51:41 +0200 Subject: [PATCH 3/4] Improve wording on nullability --- .../share/classes/jdk/internal/net/http/PullPublisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java index f3b4035f8c7ea..2012aeafea2be 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java @@ -36,7 +36,7 @@ */ class PullPublisher implements Flow.Publisher { - // Only one of `iterable` and `throwable` can be non-null. throwable is + // Only one of `iterable` or `throwable` should be null, and the other non-null. throwable is // non-null when an error has been encountered, by the creator of // PullPublisher, while subscribing the subscriber, but before subscribe has // completed. @@ -46,7 +46,7 @@ class PullPublisher implements Flow.Publisher { PullPublisher(CheckedIterable iterable, Throwable throwable) { if ((iterable == null) == (throwable == null)) { String message = String.format( - "only one of `iterable` or `throwable` can be null, but %s are", + "only one of `iterable` or `throwable` should be null, and the other non-null, but %s are null", throwable == null ? "both" : "none"); throw new IllegalArgumentException(message); } From 89dba9a14cdf47e9833e724c8b1385a9d892e3cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 22 Aug 2025 10:56:46 +0200 Subject: [PATCH 4/4] Revert redundant changes --- .../internal/net/http/RequestPublishers.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index f230a752117f1..81cef1f330ca1 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -109,30 +110,26 @@ public long contentLength() { // This implementation has lots of room for improvement. public static class IterablePublisher implements BodyPublisher { - - private final CheckedIterable content; + private final Iterable content; private volatile long contentLength; public IterablePublisher(Iterable content) { - Objects.requireNonNull(content, "content"); - this.content = CheckedIterable.fromIterable(content); + this.content = Objects.requireNonNull(content); } // The ByteBufferIterator will iterate over the byte[] arrays in // the content one at the time. // - class ByteBufferIterator implements CheckedIterator { - + class ByteBufferIterator implements Iterator { final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue<>(); - final CheckedIterator iterator = content.iterator(); - + final Iterator iterator = content.iterator(); @Override - public boolean hasNext() throws Exception { + public boolean hasNext() { return !buffers.isEmpty() || iterator.hasNext(); } @Override - public ByteBuffer next() throws Exception { + public ByteBuffer next() { ByteBuffer buffer = buffers.poll(); while (buffer == null) { copy(); @@ -145,7 +142,7 @@ ByteBuffer getBuffer() { return Utils.getBuffer(); } - void copy() throws Exception { + void copy() { byte[] bytes = iterator.next(); int length = bytes.length; if (length == 0 && iterator.hasNext()) { @@ -168,18 +165,18 @@ void copy() throws Exception { } } - CheckedIterator iterator() { + public Iterator iterator() { return new ByteBufferIterator(); } @Override public void subscribe(Flow.Subscriber subscriber) { - CheckedIterable iterable = this::iterator; + CheckedIterable iterable = () -> CheckedIterator.fromIterator(iterator()); var delegate = new PullPublisher<>(iterable); delegate.subscribe(subscriber); } - static long computeLength(CheckedIterable bytes) { + static long computeLength(Iterable bytes) { // Avoid iterating just for the purpose of computing // a length, in case iterating is a costly operation // For HTTP/1.1 it means we will be using chunk encoding