001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     https://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.FilterInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024import java.util.Objects;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.locks.Condition;
030import java.util.concurrent.locks.ReentrantLock;
031
032import org.apache.commons.io.IOUtils;
033import org.apache.commons.io.build.AbstractStreamBuilder;
034
035/**
036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
040 * <p>
041 * To build an instance, use {@link Builder}.
042 * </p>
043 * <p>
044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
045 * </p>
046 *
047 * @see Builder
048 * @since 2.9.0
049 */
050public class ReadAheadInputStream extends FilterInputStream {
051
052    // @formatter:off
053    /**
054     * Builds a new {@link ReadAheadInputStream}.
055     *
056     * <p>
057     * For example:
058     * </p>
059     * <pre>{@code
060     * ReadAheadInputStream s = ReadAheadInputStream.builder()
061     *   .setPath(path)
062     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
063     *   .get();}
064     * </pre>
065     * <p>
066     * If an {@link ExecutorService} is not set, then a single-threaded daemon executor service is used.
067     * </p>
068     *
069     * @see #get()
070     * @since 2.12.0
071     */
072    // @formatter:on
073    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
074
075        private ExecutorService executorService;
076
077        /**
078         * Constructs a new builder of {@link ReadAheadInputStream}.
079         */
080        public Builder() {
081            // empty
082        }
083
084        /**
085         * Builds a new {@link ReadAheadInputStream}.
086         * <p>
087         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
088         * </p>
089         * <p>
090         * This builder uses the following aspects:
091         * </p>
092         * <ul>
093         * <li>{@link #getInputStream()} gets the target aspect.</li>
094         * <li>{@link #getBufferSize()}</li>
095         * <li>{@link ExecutorService}, if not set, a single-threaded daemon executor service is used.</li>
096         * </ul>
097         *
098         * @return a new instance.
099         * @throws IllegalStateException         if the {@code origin} is {@code null}.
100         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
101         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
102         * @see #getInputStream()
103         * @see #getBufferSize()
104         * @see #getUnchecked()
105         */
106        @Override
107        public ReadAheadInputStream get() throws IOException {
108            return new ReadAheadInputStream(this);
109        }
110
111        /**
112         * Sets the executor service for the read-ahead thread.
113         * <p>
114         * If not set, a single-threaded daemon executor service is used.
115         * </p>
116         *
117         * @param executorService the executor service for the read-ahead thread, may be {@code null}.
118         * @return {@code this} instance.
119         */
120        public Builder setExecutorService(final ExecutorService executorService) {
121            this.executorService = executorService;
122            return this;
123        }
124
125    }
126
127    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
128
129    /**
130     * Constructs a new {@link Builder}.
131     *
132     * @return a new {@link Builder}.
133     * @since 2.12.0
134     */
135    public static Builder builder() {
136        return new Builder();
137    }
138
139    /**
140     * Constructs a new daemon thread.
141     *
142     * @param r the thread's runnable.
143     * @return a new daemon thread.
144     */
145    private static Thread newDaemonThread(final Runnable r) {
146        final Thread thread = new Thread(r, "commons-io-read-ahead");
147        thread.setDaemon(true);
148        return thread;
149    }
150
151    /**
152     * Constructs a new daemon executor service.
153     *
154     * @return a new daemon executor service.
155     */
156    private static ExecutorService newExecutorService() {
157        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
158    }
159
160    private final ReentrantLock stateChangeLock = new ReentrantLock();
161
162    // @GuardedBy("stateChangeLock")
163    private ByteBuffer activeBuffer;
164
165    // @GuardedBy("stateChangeLock")
166    private ByteBuffer readAheadBuffer;
167
168    // @GuardedBy("stateChangeLock")
169    private boolean endOfStream;
170
171    // @GuardedBy("stateChangeLock")
172    // true if async read is in progress
173    private boolean readInProgress;
174
175    // @GuardedBy("stateChangeLock")
176    // true if read is aborted due to an exception in reading from underlying input stream.
177    private boolean readAborted;
178
179    // @GuardedBy("stateChangeLock")
180    private Throwable readException;
181
182    // @GuardedBy("stateChangeLock")
183    // whether the close method is called.
184    private boolean isClosed;
185
186    // @GuardedBy("stateChangeLock")
187    // true when the close method will close the underlying input stream. This is valid only if
188    // `isClosed` is true.
189    private boolean isUnderlyingInputStreamBeingClosed;
190
191    // @GuardedBy("stateChangeLock")
192    // whether there is a read ahead task running,
193    private boolean isReading;
194
195    // Whether there is a reader waiting for data.
196    private final AtomicBoolean isWaiting = new AtomicBoolean();
197
198    private final ExecutorService executorService;
199
200    private final boolean shutdownExecutorService;
201
202    private final Condition asyncReadComplete = stateChangeLock.newCondition();
203
204    @SuppressWarnings("resource")
205    private ReadAheadInputStream(final Builder builder) throws IOException {
206        this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(),
207                builder.executorService == null);
208    }
209
210    /**
211     * Constructs an instance with the specified buffer size and read-ahead threshold.
212     *
213     * @param inputStream       The underlying input stream.
214     * @param bufferSizeInBytes The buffer size.
215     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
216     */
217    @Deprecated
218    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
219        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
220    }
221
222    /**
223     * Constructs an instance with the specified buffer size and read-ahead threshold.
224     *
225     * @param inputStream       The underlying input stream.
226     * @param bufferSizeInBytes The buffer size.
227     * @param executorService   An executor service for the read-ahead thread.
228     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
229     */
230    @Deprecated
231    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
232        this(inputStream, bufferSizeInBytes, executorService, false);
233    }
234
235    /**
236     * Constructs an instance with the specified buffer size and read-ahead threshold.
237     *
238     * @param inputStream             The underlying input stream.
239     * @param bufferSizeInBytes       The buffer size.
240     * @param executorService         An executor service for the read-ahead thread.
241     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
242     */
243    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
244            final boolean shutdownExecutorService) {
245        super(Objects.requireNonNull(inputStream, "inputStream"));
246        if (bufferSizeInBytes <= 0) {
247            throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes));
248        }
249        this.executorService = Objects.requireNonNull(executorService, "executorService");
250        this.shutdownExecutorService = shutdownExecutorService;
251        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
252        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
253        this.activeBuffer.flip();
254        this.readAheadBuffer.flip();
255    }
256
257    @Override
258    public int available() throws IOException {
259        stateChangeLock.lock();
260        // Make sure we have no integer overflow.
261        try {
262            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
263        } finally {
264            stateChangeLock.unlock();
265        }
266    }
267
268    private void checkReadException() throws IOException {
269        if (readAborted) {
270            if (readException instanceof IOException) {
271                throw (IOException) readException;
272            }
273            throw new IOException(readException);
274        }
275    }
276
277    @Override
278    public void close() throws IOException {
279        boolean isSafeToCloseUnderlyingInputStream = false;
280        stateChangeLock.lock();
281        try {
282            if (isClosed) {
283                return;
284            }
285            isClosed = true;
286            if (!isReading) {
287                // Nobody is reading, so we can close the underlying input stream in this method.
288                isSafeToCloseUnderlyingInputStream = true;
289                // Flip this to make sure the read ahead task will not close the underlying input stream.
290                isUnderlyingInputStreamBeingClosed = true;
291            }
292        } finally {
293            stateChangeLock.unlock();
294        }
295        if (shutdownExecutorService) {
296            try {
297                shutdownAwait();
298            } catch (final InterruptedException e) {
299                Thread.currentThread().interrupt();
300                throw Input.toInterruptedIOException(e);
301            } finally {
302                if (isSafeToCloseUnderlyingInputStream) {
303                    super.close();
304                }
305            }
306        }
307        if (isSafeToCloseUnderlyingInputStream) {
308            super.close();
309        }
310    }
311
312    private void closeUnderlyingInputStreamIfNecessary() {
313        boolean needToCloseUnderlyingInputStream = false;
314        stateChangeLock.lock();
315        try {
316            isReading = false;
317            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
318                // close method cannot close underlyingInputStream because we were reading.
319                needToCloseUnderlyingInputStream = true;
320            }
321        } finally {
322            stateChangeLock.unlock();
323        }
324        if (needToCloseUnderlyingInputStream) {
325            try {
326                super.close();
327            } catch (final IOException ignored) {
328                // TODO Rethrow as UncheckedIOException?
329            }
330        }
331    }
332
333    private boolean isEndOfStream() {
334        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
335    }
336
337    @Override
338    public int read() throws IOException {
339        if (activeBuffer.hasRemaining()) {
340            // short path - just get one byte.
341            return activeBuffer.get() & 0xFF;
342        }
343        final byte[] oneByteArray = BYTE_ARRAY_1.get();
344        oneByteArray[0] = 0;
345        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
346    }
347
348    @Override
349    public int read(final byte[] b, final int offset, int len) throws IOException {
350        IOUtils.checkFromIndexSize(b, offset, len);
351        if (len == 0) {
352            return 0;
353        }
354        if (!activeBuffer.hasRemaining()) {
355            // No remaining in active buffer - lock and switch to write ahead buffer.
356            stateChangeLock.lock();
357            try {
358                waitForAsyncReadComplete();
359                if (!readAheadBuffer.hasRemaining()) {
360                    // The first read.
361                    readAsync();
362                    waitForAsyncReadComplete();
363                    if (isEndOfStream()) {
364                        return EOF;
365                    }
366                }
367                // Swap the newly read ahead buffer in place of empty active buffer.
368                swapBuffers();
369                // After swapping buffers, trigger another async read for read ahead buffer.
370                readAsync();
371            } finally {
372                stateChangeLock.unlock();
373            }
374        }
375        len = Math.min(len, activeBuffer.remaining());
376        activeBuffer.get(b, offset, len);
377
378        return len;
379    }
380
381    /**
382     * Reads data from underlyingInputStream to readAheadBuffer asynchronously.
383     *
384     * @throws IOException if an I/O error occurs.
385     */
386    private void readAsync() throws IOException {
387        stateChangeLock.lock();
388        final byte[] arr;
389        try {
390            arr = readAheadBuffer.array();
391            if (endOfStream || readInProgress) {
392                return;
393            }
394            checkReadException();
395            readAheadBuffer.position(0);
396            readAheadBuffer.flip();
397            readInProgress = true;
398        } finally {
399            stateChangeLock.unlock();
400        }
401        executorService.execute(() -> {
402            stateChangeLock.lock();
403            try {
404                if (isClosed) {
405                    readInProgress = false;
406                    return;
407                }
408                // Flip this so that the close method will not close the underlying input stream when we
409                // are reading.
410                isReading = true;
411            } finally {
412                stateChangeLock.unlock();
413            }
414
415            // Please note that it is safe to release the lock and read into the read ahead buffer
416            // because either of following two conditions will hold:
417            //
418            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
419            //
420            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
421            // for this async read to complete.
422            //
423            // So there is no race condition in both the situations.
424            int read = 0;
425            int off = 0;
426            int len = arr.length;
427            Throwable exception = null;
428            try {
429                // try to fill the read ahead buffer.
430                // if a reader is waiting, possibly return early.
431                do {
432                    read = in.read(arr, off, len);
433                    if (read <= 0) {
434                        break;
435                    }
436                    off += read;
437                    len -= read;
438                } while (len > 0 && !isWaiting.get());
439            } catch (final Throwable ex) {
440                exception = ex;
441                if (ex instanceof Error) {
442                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
443                    // The user can see Error in UncaughtExceptionHandler.
444                    throw (Error) ex;
445                }
446            } finally {
447                stateChangeLock.lock();
448                try {
449                    readAheadBuffer.limit(off);
450                    if (read < 0 || exception instanceof EOFException) {
451                        endOfStream = true;
452                    } else if (exception != null) {
453                        readAborted = true;
454                        readException = exception;
455                    }
456                    readInProgress = false;
457                    signalAsyncReadComplete();
458                } finally {
459                    stateChangeLock.unlock();
460                }
461                closeUnderlyingInputStreamIfNecessary();
462            }
463        });
464    }
465
466    boolean shutdownAwait() throws InterruptedException {
467        executorService.shutdownNow();
468        return executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
469    }
470
471    private void signalAsyncReadComplete() {
472        stateChangeLock.lock();
473        try {
474            asyncReadComplete.signalAll();
475        } finally {
476            stateChangeLock.unlock();
477        }
478    }
479
480    @Override
481    public long skip(final long n) throws IOException {
482        if (n <= 0L) {
483            return 0L;
484        }
485        if (n <= activeBuffer.remaining()) {
486            // Only skipping from active buffer is sufficient
487            activeBuffer.position((int) n + activeBuffer.position());
488            return n;
489        }
490        stateChangeLock.lock();
491        final long skipped;
492        try {
493            skipped = skipInternal(n);
494        } finally {
495            stateChangeLock.unlock();
496        }
497        return skipped;
498    }
499
500    /**
501     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
502     * calling this function.
503     *
504     * @param n the number of bytes to be skipped.
505     * @return the actual number of bytes skipped.
506     * @throws IOException if an I/O error occurs.
507     */
508    private long skipInternal(final long n) throws IOException {
509        if (!stateChangeLock.isLocked()) {
510            throw new IllegalStateException("Expected stateChangeLock to be locked");
511        }
512        waitForAsyncReadComplete();
513        if (isEndOfStream()) {
514            return 0;
515        }
516        if (available() >= n) {
517            // we can skip from the internal buffers
518            int toSkip = (int) n;
519            // We need to skip from both active buffer and read ahead buffer
520            toSkip -= activeBuffer.remaining();
521            if (toSkip <= 0) { // skipping from activeBuffer already handled.
522                throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip);
523            }
524            activeBuffer.position(0);
525            activeBuffer.flip();
526            readAheadBuffer.position(toSkip + readAheadBuffer.position());
527            swapBuffers();
528            // Trigger async read to emptied read ahead buffer.
529            readAsync();
530            return n;
531        }
532        final int skippedBytes = available();
533        final long toSkip = n - skippedBytes;
534        activeBuffer.position(0);
535        activeBuffer.flip();
536        readAheadBuffer.position(0);
537        readAheadBuffer.flip();
538        final long skippedFromInputStream = in.skip(toSkip);
539        readAsync();
540        return skippedBytes + skippedFromInputStream;
541    }
542
543    /**
544     * Flips the active and read ahead buffers.
545     */
546    private void swapBuffers() {
547        final ByteBuffer temp = activeBuffer;
548        activeBuffer = readAheadBuffer;
549        readAheadBuffer = temp;
550    }
551
552    private void waitForAsyncReadComplete() throws IOException {
553        stateChangeLock.lock();
554        try {
555            isWaiting.set(true);
556            // There is only one reader, and one writer, so the writer should signal only once,
557            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
558            while (readInProgress) {
559                asyncReadComplete.await();
560            }
561        } catch (final InterruptedException e) {
562            Thread.currentThread().interrupt();
563            throw Input.toInterruptedIOException(e);
564        } finally {
565            try {
566                isWaiting.set(false);
567            } finally {
568                stateChangeLock.unlock();
569            }
570        }
571        checkReadException();
572    }
573}