001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * https://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.nio.ByteBuffer; 024import java.util.Objects; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.locks.Condition; 030import java.util.concurrent.locks.ReentrantLock; 031 032import org.apache.commons.io.IOUtils; 033import org.apache.commons.io.build.AbstractStreamBuilder; 034 035/** 036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 040 * <p> 041 * To build an instance, use {@link Builder}. 042 * </p> 043 * <p> 044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 045 * </p> 046 * 047 * @see Builder 048 * @since 2.9.0 049 */ 050public class ReadAheadInputStream extends FilterInputStream { 051 052 // @formatter:off 053 /** 054 * Builds a new {@link ReadAheadInputStream}. 055 * 056 * <p> 057 * For example: 058 * </p> 059 * <pre>{@code 060 * ReadAheadInputStream s = ReadAheadInputStream.builder() 061 * .setPath(path) 062 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 063 * .get();} 064 * </pre> 065 * <p> 066 * If an {@link ExecutorService} is not set, then a single-threaded daemon executor service is used. 067 * </p> 068 * 069 * @see #get() 070 * @since 2.12.0 071 */ 072 // @formatter:on 073 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 074 075 private ExecutorService executorService; 076 077 /** 078 * Constructs a new builder of {@link ReadAheadInputStream}. 079 */ 080 public Builder() { 081 // empty 082 } 083 084 /** 085 * Builds a new {@link ReadAheadInputStream}. 086 * <p> 087 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 088 * </p> 089 * <p> 090 * This builder uses the following aspects: 091 * </p> 092 * <ul> 093 * <li>{@link #getInputStream()} gets the target aspect.</li> 094 * <li>{@link #getBufferSize()}</li> 095 * <li>{@link ExecutorService}, if not set, a single-threaded daemon executor service is used.</li> 096 * </ul> 097 * 098 * @return a new instance. 099 * @throws IllegalStateException if the {@code origin} is {@code null}. 100 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 101 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 102 * @see #getInputStream() 103 * @see #getBufferSize() 104 * @see #getUnchecked() 105 */ 106 @Override 107 public ReadAheadInputStream get() throws IOException { 108 return new ReadAheadInputStream(this); 109 } 110 111 /** 112 * Sets the executor service for the read-ahead thread. 113 * <p> 114 * If not set, a single-threaded daemon executor service is used. 115 * </p> 116 * 117 * @param executorService the executor service for the read-ahead thread, may be {@code null}. 118 * @return {@code this} instance. 119 */ 120 public Builder setExecutorService(final ExecutorService executorService) { 121 this.executorService = executorService; 122 return this; 123 } 124 125 } 126 127 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 128 129 /** 130 * Constructs a new {@link Builder}. 131 * 132 * @return a new {@link Builder}. 133 * @since 2.12.0 134 */ 135 public static Builder builder() { 136 return new Builder(); 137 } 138 139 /** 140 * Constructs a new daemon thread. 141 * 142 * @param r the thread's runnable. 143 * @return a new daemon thread. 144 */ 145 private static Thread newDaemonThread(final Runnable r) { 146 final Thread thread = new Thread(r, "commons-io-read-ahead"); 147 thread.setDaemon(true); 148 return thread; 149 } 150 151 /** 152 * Constructs a new daemon executor service. 153 * 154 * @return a new daemon executor service. 155 */ 156 private static ExecutorService newExecutorService() { 157 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 158 } 159 160 private final ReentrantLock stateChangeLock = new ReentrantLock(); 161 162 // @GuardedBy("stateChangeLock") 163 private ByteBuffer activeBuffer; 164 165 // @GuardedBy("stateChangeLock") 166 private ByteBuffer readAheadBuffer; 167 168 // @GuardedBy("stateChangeLock") 169 private boolean endOfStream; 170 171 // @GuardedBy("stateChangeLock") 172 // true if async read is in progress 173 private boolean readInProgress; 174 175 // @GuardedBy("stateChangeLock") 176 // true if read is aborted due to an exception in reading from underlying input stream. 177 private boolean readAborted; 178 179 // @GuardedBy("stateChangeLock") 180 private Throwable readException; 181 182 // @GuardedBy("stateChangeLock") 183 // whether the close method is called. 184 private boolean isClosed; 185 186 // @GuardedBy("stateChangeLock") 187 // true when the close method will close the underlying input stream. This is valid only if 188 // `isClosed` is true. 189 private boolean isUnderlyingInputStreamBeingClosed; 190 191 // @GuardedBy("stateChangeLock") 192 // whether there is a read ahead task running, 193 private boolean isReading; 194 195 // Whether there is a reader waiting for data. 196 private final AtomicBoolean isWaiting = new AtomicBoolean(); 197 198 private final ExecutorService executorService; 199 200 private final boolean shutdownExecutorService; 201 202 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 203 204 @SuppressWarnings("resource") 205 private ReadAheadInputStream(final Builder builder) throws IOException { 206 this(builder.getInputStream(), builder.getBufferSize(), builder.executorService != null ? builder.executorService : newExecutorService(), 207 builder.executorService == null); 208 } 209 210 /** 211 * Constructs an instance with the specified buffer size and read-ahead threshold. 212 * 213 * @param inputStream The underlying input stream. 214 * @param bufferSizeInBytes The buffer size. 215 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 216 */ 217 @Deprecated 218 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 219 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 220 } 221 222 /** 223 * Constructs an instance with the specified buffer size and read-ahead threshold. 224 * 225 * @param inputStream The underlying input stream. 226 * @param bufferSizeInBytes The buffer size. 227 * @param executorService An executor service for the read-ahead thread. 228 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 229 */ 230 @Deprecated 231 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 232 this(inputStream, bufferSizeInBytes, executorService, false); 233 } 234 235 /** 236 * Constructs an instance with the specified buffer size and read-ahead threshold. 237 * 238 * @param inputStream The underlying input stream. 239 * @param bufferSizeInBytes The buffer size. 240 * @param executorService An executor service for the read-ahead thread. 241 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 242 */ 243 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 244 final boolean shutdownExecutorService) { 245 super(Objects.requireNonNull(inputStream, "inputStream")); 246 if (bufferSizeInBytes <= 0) { 247 throw new IllegalArgumentException(String.format("bufferSizeInBytes <= 0, bufferSizeInBytes = %,d", bufferSizeInBytes)); 248 } 249 this.executorService = Objects.requireNonNull(executorService, "executorService"); 250 this.shutdownExecutorService = shutdownExecutorService; 251 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 252 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 253 this.activeBuffer.flip(); 254 this.readAheadBuffer.flip(); 255 } 256 257 @Override 258 public int available() throws IOException { 259 stateChangeLock.lock(); 260 // Make sure we have no integer overflow. 261 try { 262 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 263 } finally { 264 stateChangeLock.unlock(); 265 } 266 } 267 268 private void checkReadException() throws IOException { 269 if (readAborted) { 270 if (readException instanceof IOException) { 271 throw (IOException) readException; 272 } 273 throw new IOException(readException); 274 } 275 } 276 277 @Override 278 public void close() throws IOException { 279 boolean isSafeToCloseUnderlyingInputStream = false; 280 stateChangeLock.lock(); 281 try { 282 if (isClosed) { 283 return; 284 } 285 isClosed = true; 286 if (!isReading) { 287 // Nobody is reading, so we can close the underlying input stream in this method. 288 isSafeToCloseUnderlyingInputStream = true; 289 // Flip this to make sure the read ahead task will not close the underlying input stream. 290 isUnderlyingInputStreamBeingClosed = true; 291 } 292 } finally { 293 stateChangeLock.unlock(); 294 } 295 if (shutdownExecutorService) { 296 try { 297 shutdownAwait(); 298 } catch (final InterruptedException e) { 299 Thread.currentThread().interrupt(); 300 throw Input.toInterruptedIOException(e); 301 } finally { 302 if (isSafeToCloseUnderlyingInputStream) { 303 super.close(); 304 } 305 } 306 } 307 if (isSafeToCloseUnderlyingInputStream) { 308 super.close(); 309 } 310 } 311 312 private void closeUnderlyingInputStreamIfNecessary() { 313 boolean needToCloseUnderlyingInputStream = false; 314 stateChangeLock.lock(); 315 try { 316 isReading = false; 317 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 318 // close method cannot close underlyingInputStream because we were reading. 319 needToCloseUnderlyingInputStream = true; 320 } 321 } finally { 322 stateChangeLock.unlock(); 323 } 324 if (needToCloseUnderlyingInputStream) { 325 try { 326 super.close(); 327 } catch (final IOException ignored) { 328 // TODO Rethrow as UncheckedIOException? 329 } 330 } 331 } 332 333 private boolean isEndOfStream() { 334 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 335 } 336 337 @Override 338 public int read() throws IOException { 339 if (activeBuffer.hasRemaining()) { 340 // short path - just get one byte. 341 return activeBuffer.get() & 0xFF; 342 } 343 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 344 oneByteArray[0] = 0; 345 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 346 } 347 348 @Override 349 public int read(final byte[] b, final int offset, int len) throws IOException { 350 IOUtils.checkFromIndexSize(b, offset, len); 351 if (len == 0) { 352 return 0; 353 } 354 if (!activeBuffer.hasRemaining()) { 355 // No remaining in active buffer - lock and switch to write ahead buffer. 356 stateChangeLock.lock(); 357 try { 358 waitForAsyncReadComplete(); 359 if (!readAheadBuffer.hasRemaining()) { 360 // The first read. 361 readAsync(); 362 waitForAsyncReadComplete(); 363 if (isEndOfStream()) { 364 return EOF; 365 } 366 } 367 // Swap the newly read ahead buffer in place of empty active buffer. 368 swapBuffers(); 369 // After swapping buffers, trigger another async read for read ahead buffer. 370 readAsync(); 371 } finally { 372 stateChangeLock.unlock(); 373 } 374 } 375 len = Math.min(len, activeBuffer.remaining()); 376 activeBuffer.get(b, offset, len); 377 378 return len; 379 } 380 381 /** 382 * Reads data from underlyingInputStream to readAheadBuffer asynchronously. 383 * 384 * @throws IOException if an I/O error occurs. 385 */ 386 private void readAsync() throws IOException { 387 stateChangeLock.lock(); 388 final byte[] arr; 389 try { 390 arr = readAheadBuffer.array(); 391 if (endOfStream || readInProgress) { 392 return; 393 } 394 checkReadException(); 395 readAheadBuffer.position(0); 396 readAheadBuffer.flip(); 397 readInProgress = true; 398 } finally { 399 stateChangeLock.unlock(); 400 } 401 executorService.execute(() -> { 402 stateChangeLock.lock(); 403 try { 404 if (isClosed) { 405 readInProgress = false; 406 return; 407 } 408 // Flip this so that the close method will not close the underlying input stream when we 409 // are reading. 410 isReading = true; 411 } finally { 412 stateChangeLock.unlock(); 413 } 414 415 // Please note that it is safe to release the lock and read into the read ahead buffer 416 // because either of following two conditions will hold: 417 // 418 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 419 // 420 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 421 // for this async read to complete. 422 // 423 // So there is no race condition in both the situations. 424 int read = 0; 425 int off = 0; 426 int len = arr.length; 427 Throwable exception = null; 428 try { 429 // try to fill the read ahead buffer. 430 // if a reader is waiting, possibly return early. 431 do { 432 read = in.read(arr, off, len); 433 if (read <= 0) { 434 break; 435 } 436 off += read; 437 len -= read; 438 } while (len > 0 && !isWaiting.get()); 439 } catch (final Throwable ex) { 440 exception = ex; 441 if (ex instanceof Error) { 442 // `readException` may not be reported to the user. Rethrow Error to make sure at least 443 // The user can see Error in UncaughtExceptionHandler. 444 throw (Error) ex; 445 } 446 } finally { 447 stateChangeLock.lock(); 448 try { 449 readAheadBuffer.limit(off); 450 if (read < 0 || exception instanceof EOFException) { 451 endOfStream = true; 452 } else if (exception != null) { 453 readAborted = true; 454 readException = exception; 455 } 456 readInProgress = false; 457 signalAsyncReadComplete(); 458 } finally { 459 stateChangeLock.unlock(); 460 } 461 closeUnderlyingInputStreamIfNecessary(); 462 } 463 }); 464 } 465 466 boolean shutdownAwait() throws InterruptedException { 467 executorService.shutdownNow(); 468 return executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 469 } 470 471 private void signalAsyncReadComplete() { 472 stateChangeLock.lock(); 473 try { 474 asyncReadComplete.signalAll(); 475 } finally { 476 stateChangeLock.unlock(); 477 } 478 } 479 480 @Override 481 public long skip(final long n) throws IOException { 482 if (n <= 0L) { 483 return 0L; 484 } 485 if (n <= activeBuffer.remaining()) { 486 // Only skipping from active buffer is sufficient 487 activeBuffer.position((int) n + activeBuffer.position()); 488 return n; 489 } 490 stateChangeLock.lock(); 491 final long skipped; 492 try { 493 skipped = skipInternal(n); 494 } finally { 495 stateChangeLock.unlock(); 496 } 497 return skipped; 498 } 499 500 /** 501 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 502 * calling this function. 503 * 504 * @param n the number of bytes to be skipped. 505 * @return the actual number of bytes skipped. 506 * @throws IOException if an I/O error occurs. 507 */ 508 private long skipInternal(final long n) throws IOException { 509 if (!stateChangeLock.isLocked()) { 510 throw new IllegalStateException("Expected stateChangeLock to be locked"); 511 } 512 waitForAsyncReadComplete(); 513 if (isEndOfStream()) { 514 return 0; 515 } 516 if (available() >= n) { 517 // we can skip from the internal buffers 518 int toSkip = (int) n; 519 // We need to skip from both active buffer and read ahead buffer 520 toSkip -= activeBuffer.remaining(); 521 if (toSkip <= 0) { // skipping from activeBuffer already handled. 522 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip); 523 } 524 activeBuffer.position(0); 525 activeBuffer.flip(); 526 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 527 swapBuffers(); 528 // Trigger async read to emptied read ahead buffer. 529 readAsync(); 530 return n; 531 } 532 final int skippedBytes = available(); 533 final long toSkip = n - skippedBytes; 534 activeBuffer.position(0); 535 activeBuffer.flip(); 536 readAheadBuffer.position(0); 537 readAheadBuffer.flip(); 538 final long skippedFromInputStream = in.skip(toSkip); 539 readAsync(); 540 return skippedBytes + skippedFromInputStream; 541 } 542 543 /** 544 * Flips the active and read ahead buffers. 545 */ 546 private void swapBuffers() { 547 final ByteBuffer temp = activeBuffer; 548 activeBuffer = readAheadBuffer; 549 readAheadBuffer = temp; 550 } 551 552 private void waitForAsyncReadComplete() throws IOException { 553 stateChangeLock.lock(); 554 try { 555 isWaiting.set(true); 556 // There is only one reader, and one writer, so the writer should signal only once, 557 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 558 while (readInProgress) { 559 asyncReadComplete.await(); 560 } 561 } catch (final InterruptedException e) { 562 Thread.currentThread().interrupt(); 563 throw Input.toInterruptedIOException(e); 564 } finally { 565 try { 566 isWaiting.set(false); 567 } finally { 568 stateChangeLock.unlock(); 569 } 570 } 571 checkReadException(); 572 } 573}