diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java new file mode 100644 index 0000000000000..f28b57b956591 --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.util.Iterator; + +/** + * An {@link Iterable} clone for {@link CheckedIterator}. + * + * @param the type of elements returned by the produced iterators + */ +@FunctionalInterface +interface CheckedIterable { + + /** + * {@return an {@linkplain CheckedIterator iterator} over elements of type {@code E}} + */ + CheckedIterator iterator(); + + static CheckedIterable fromIterable(Iterable iterable) { + return () -> { + Iterator iterator = iterable.iterator(); + return CheckedIterator.fromIterator(iterator); + }; + } + +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java new file mode 100644 index 0000000000000..fe10785abb75f --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * An {@link java.util.Iterator} clone supporting checked exceptions. + * + * @param the type of elements returned by this iterator + */ +interface CheckedIterator { + + /** + * {@return {@code true} if the iteration has more elements} + * @throws Exception if operation fails + */ + boolean hasNext() throws Exception; + + /** + * {@return the next element in the iteration} + * + * @throws NoSuchElementException if the iteration has no more elements + * @throws Exception if operation fails + */ + E next() throws Exception; + + static CheckedIterator fromIterator(Iterator iterator) { + return new CheckedIterator<>() { + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public E next() { + return iterator.next(); + } + + }; + } + +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java index 0556214648e41..2012aeafea2be 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java @@ -25,30 +25,36 @@ package jdk.internal.net.http; -import java.util.Iterator; import java.util.concurrent.Flow; + import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.SequentialScheduler; /** - * A Publisher that publishes items obtained from the given Iterable. Each new - * subscription gets a new Iterator. + * A {@linkplain Flow.Publisher publisher} that publishes items obtained from the given {@link CheckedIterator} supplier. + * Each new subscription gets a new {@code CheckedIterator}. */ class PullPublisher implements Flow.Publisher { - // Only one of `iterable` and `throwable` can be non-null. throwable is + // Only one of `iterable` or `throwable` should be null, and the other non-null. throwable is // non-null when an error has been encountered, by the creator of // PullPublisher, while subscribing the subscriber, but before subscribe has // completed. - private final Iterable iterable; + private final CheckedIterable iterable; private final Throwable throwable; - PullPublisher(Iterable iterable, Throwable throwable) { + PullPublisher(CheckedIterable iterable, Throwable throwable) { + if ((iterable == null) == (throwable == null)) { + String message = String.format( + "only one of `iterable` or `throwable` should be null, and the other non-null, but %s are null", + throwable == null ? "both" : "none"); + throw new IllegalArgumentException(message); + } this.iterable = iterable; this.throwable = throwable; } - PullPublisher(Iterable iterable) { + PullPublisher(CheckedIterable iterable) { this(iterable, null); } @@ -56,10 +62,8 @@ class PullPublisher implements Flow.Publisher { public void subscribe(Flow.Subscriber subscriber) { Subscription sub; if (throwable != null) { - assert iterable == null : "non-null iterable: " + iterable; sub = new Subscription(subscriber, null, throwable); } else { - assert throwable == null : "non-null exception: " + throwable; sub = new Subscription(subscriber, iterable.iterator(), null); } subscriber.onSubscribe(sub); @@ -72,7 +76,7 @@ public void subscribe(Flow.Subscriber subscriber) { private class Subscription implements Flow.Subscription { private final Flow.Subscriber subscriber; - private final Iterator iter; + private final CheckedIterator iter; private volatile boolean completed; private volatile boolean cancelled; private volatile Throwable error; @@ -80,7 +84,7 @@ private class Subscription implements Flow.Subscription { private final Demand demand = new Demand(); Subscription(Flow.Subscriber subscriber, - Iterator iter, + CheckedIterator iter, Throwable throwable) { this.subscriber = subscriber; this.iter = iter; @@ -117,7 +121,18 @@ protected void run() { } subscriber.onNext(next); } - if (!iter.hasNext() && !cancelled) { + + boolean hasNext; + try { + hasNext = iter.hasNext(); + } catch (Exception e) { + completed = true; + pullScheduler.stop(); + subscriber.onError(e); + return; + } + + if (!hasNext && !cancelled) { completed = true; pullScheduler.stop(); subscriber.onComplete(); diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index dd5443c503567..81cef1f330ca1 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -98,7 +98,7 @@ List copy(byte[] content, int offset, int length) { @Override public void subscribe(Flow.Subscriber subscriber) { List copy = copy(content, offset, length); - var delegate = new PullPublisher<>(copy); + var delegate = new PullPublisher<>(CheckedIterable.fromIterable(copy)); delegate.subscribe(subscriber); } @@ -171,7 +171,7 @@ public Iterator iterator() { @Override public void subscribe(Flow.Subscriber subscriber) { - Iterable iterable = this::iterator; + CheckedIterable iterable = () -> CheckedIterator.fromIterator(iterator()); var delegate = new PullPublisher<>(iterable); delegate.subscribe(subscriber); } @@ -207,7 +207,7 @@ public StringPublisher(String content, Charset charset) { public static class EmptyPublisher implements BodyPublisher { private final Flow.Publisher delegate = - new PullPublisher(Collections.emptyList(), null); + new PullPublisher<>(CheckedIterable.fromIterable(Collections.emptyList()), null); @Override public long contentLength() { @@ -289,7 +289,7 @@ public long contentLength() { /** * Reads one buffer ahead all the time, blocking in hasNext() */ - public static class StreamIterator implements Iterator { + public static class StreamIterator implements CheckedIterator { final InputStream is; final Supplier bufSupplier; private volatile boolean eof; @@ -330,20 +330,8 @@ private int read() throws IOException { return n; } - /** - * Close stream in this instance. - * UncheckedIOException may be thrown if IOE happens at InputStream::close. - */ - private void closeStream() { - try { - is.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { stateLock.lock(); try { return hasNext0(); @@ -352,7 +340,7 @@ public boolean hasNext() { } } - private boolean hasNext0() { + private boolean hasNext0() throws IOException { if (need2Read) { try { haveNext = read() != -1; @@ -362,10 +350,10 @@ private boolean hasNext0() { } catch (IOException e) { haveNext = false; need2Read = false; - throw new UncheckedIOException(e); + throw new IOException(e); } finally { if (!haveNext) { - closeStream(); + is.close(); } } } @@ -373,7 +361,7 @@ private boolean hasNext0() { } @Override - public ByteBuffer next() { + public ByteBuffer next() throws IOException { stateLock.lock(); try { if (!hasNext()) { @@ -408,7 +396,7 @@ public void subscribe(Flow.Subscriber subscriber) { publisher.subscribe(subscriber); } - protected Iterable iterableOf(InputStream is) { + CheckedIterable iterableOf(InputStream is) { return () -> new StreamIterator(is); }