001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.util.ArrayList; 022import java.util.List; 023 024import org.apache.commons.io.IOUtils; 025 026 027/** 028 * The {@link ObservableInputStream} allows, that an InputStream may be consumed 029 * by other receivers, apart from the thread, which is reading it. 030 * The other consumers are implemented as instances of {@link Observer}. A 031 * typical application may be the generation of a {@link java.security.MessageDigest} on the 032 * fly. 033 * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe, 034 * as instances of InputStream usually aren't. 035 * If you must access the stream from multiple threads, then synchronization, locking, 036 * or a similar means must be used. 037 * @see MessageDigestCalculatingInputStream 038 */ 039public class ObservableInputStream extends ProxyInputStream { 040 041 /** 042 * Abstracts observer callback for {@code ObservableInputStream}s. 043 */ 044 public static abstract class Observer { 045 046 /** 047 * Called to indicate, that {@link InputStream#read()} has been invoked 048 * on the {@link ObservableInputStream}, and will return a value. 049 * @param pByte The value, which is being returned. This will never be -1 (EOF), 050 * because, in that case, {@link #finished()} will be invoked instead. 051 * @throws IOException if an i/o-error occurs 052 */ 053 public void data(final int pByte) throws IOException { 054 // noop 055 } 056 057 /** 058 * Called to indicate that {@link InputStream#read(byte[])}, or 059 * {@link InputStream#read(byte[], int, int)} have been called, and are about to 060 * invoke data. 061 * @param pBuffer The byte array, which has been passed to the read call, and where 062 * data has been stored. 063 * @param pOffset The offset within the byte array, where data has been stored. 064 * @param pLength The number of bytes, which have been stored in the byte array. 065 * @throws IOException if an i/o-error occurs 066 */ 067 public void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 068 // noop 069 } 070 071 /** 072 * Called to indicate that EOF has been seen on the underlying stream. 073 * This method may be called multiple times, if the reader keeps invoking 074 * either of the read methods, and they will consequently keep returning 075 * EOF. 076 * @throws IOException if an i/o-error occurs 077 */ 078 public void finished() throws IOException { 079 // noop 080 } 081 082 /** 083 * Called to indicate that the {@link ObservableInputStream} has been closed. 084 * @throws IOException if an i/o-error occurs 085 */ 086 public void closed() throws IOException { 087 // noop 088 } 089 090 /** 091 * Called to indicate that an error occurred on the underlying stream. 092 * @param pException the exception to throw 093 * @throws IOException if an i/o-error occurs 094 */ 095 public void error(final IOException pException) throws IOException { throw pException; } 096 } 097 098 private final List<Observer> observers = new ArrayList<>(); 099 100 /** 101 * Creates a new ObservableInputStream for the given InputStream. 102 * @param pProxy the input stream to proxy 103 */ 104 public ObservableInputStream(final InputStream pProxy) { 105 super(pProxy); 106 } 107 108 /** 109 * Adds an Observer. 110 * @param pObserver the observer to add 111 */ 112 public void add(final Observer pObserver) { 113 observers.add(pObserver); 114 } 115 116 /** 117 * Removes an Observer. 118 * @param pObserver the observer to remove 119 */ 120 public void remove(final Observer pObserver) { 121 observers.remove(pObserver); 122 } 123 124 /** 125 * Removes all Observers. 126 */ 127 public void removeAllObservers() { 128 observers.clear(); 129 } 130 131 @Override 132 public int read() throws IOException { 133 int result = 0; 134 IOException ioe = null; 135 try { 136 result = super.read(); 137 } catch (final IOException pException) { 138 ioe = pException; 139 } 140 if (ioe != null) { 141 noteError(ioe); 142 } else if (result == -1) { 143 noteFinished(); 144 } else { 145 noteDataByte(result); 146 } 147 return result; 148 } 149 150 @Override 151 public int read(final byte[] pBuffer) throws IOException { 152 int result = 0; 153 IOException ioe = null; 154 try { 155 result = super.read(pBuffer); 156 } catch (final IOException pException) { 157 ioe = pException; 158 } 159 if (ioe != null) { 160 noteError(ioe); 161 } else if (result == -1) { 162 noteFinished(); 163 } else if (result > 0) { 164 noteDataBytes(pBuffer, 0, result); 165 } 166 return result; 167 } 168 169 @Override 170 public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 171 int result = 0; 172 IOException ioe = null; 173 try { 174 result = super.read(pBuffer, pOffset, pLength); 175 } catch (final IOException pException) { 176 ioe = pException; 177 } 178 if (ioe != null) { 179 noteError(ioe); 180 } else if (result == -1) { 181 noteFinished(); 182 } else if (result > 0) { 183 noteDataBytes(pBuffer, pOffset, result); 184 } 185 return result; 186 } 187 188 /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)} 189 * with the given arguments. 190 * @param pBuffer Passed to the observers. 191 * @param pOffset Passed to the observers. 192 * @param pLength Passed to the observers. 193 * @throws IOException Some observer has thrown an exception, which is being 194 * passed down. 195 */ 196 protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 197 for (final Observer observer : getObservers()) { 198 observer.data(pBuffer, pOffset, pLength); 199 } 200 } 201 202 /** Notifies the observers by invoking {@link Observer#finished()}. 203 * @throws IOException Some observer has thrown an exception, which is being 204 * passed down. 205 */ 206 protected void noteFinished() throws IOException { 207 for (final Observer observer : getObservers()) { 208 observer.finished(); 209 } 210 } 211 212 /** Notifies the observers by invoking {@link Observer#data(int)} 213 * with the given arguments. 214 * @param pDataByte Passed to the observers. 215 * @throws IOException Some observer has thrown an exception, which is being 216 * passed down. 217 */ 218 protected void noteDataByte(final int pDataByte) throws IOException { 219 for (final Observer observer : getObservers()) { 220 observer.data(pDataByte); 221 } 222 } 223 224 /** Notifies the observers by invoking {@link Observer#error(IOException)} 225 * with the given argument. 226 * @param pException Passed to the observers. 227 * @throws IOException Some observer has thrown an exception, which is being 228 * passed down. This may be the same exception, which has been passed as an 229 * argument. 230 */ 231 protected void noteError(final IOException pException) throws IOException { 232 for (final Observer observer : getObservers()) { 233 observer.error(pException); 234 } 235 } 236 237 /** Notifies the observers by invoking {@link Observer#finished()}. 238 * @throws IOException Some observer has thrown an exception, which is being 239 * passed down. 240 */ 241 protected void noteClosed() throws IOException { 242 for (final Observer observer : getObservers()) { 243 observer.closed(); 244 } 245 } 246 247 /** Gets all currently registered observers. 248 * @return a list of the currently registered observers 249 */ 250 protected List<Observer> getObservers() { 251 return observers; 252 } 253 254 @Override 255 public void close() throws IOException { 256 IOException ioe = null; 257 try { 258 super.close(); 259 } catch (final IOException e) { 260 ioe = e; 261 } 262 if (ioe == null) { 263 noteClosed(); 264 } else { 265 noteError(ioe); 266 } 267 } 268 269 /** Reads all data from the underlying {@link InputStream}, while notifying the 270 * observers. 271 * @throws IOException The underlying {@link InputStream}, or either of the 272 * observers has thrown an exception. 273 */ 274 public void consume() throws IOException { 275 final byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE]; 276 for (;;) { 277 final int res = read(buffer); 278 if (res == -1) { 279 return; 280 } 281 } 282 } 283 284}