123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- package com.yc.videocache;
- import java.util.concurrent.atomic.AtomicInteger;
- import static com.yc.videocache.Preconditions.checkNotNull;
- /**
- * Proxy for {@link Source} with caching support ({@link Cache}).
- * <p/>
- * Can be used only for sources with persistent data (that doesn't change with time).
- * Method {@link #read(byte[], long, int)} will be blocked while fetching data from source.
- * Useful for streaming something with caching e.g. streaming video/audio etc.
- */
- class ProxyCache {
- private static final int MAX_READ_SOURCE_ATTEMPTS = 1;
- private final Source source;
- private final Cache cache;
- private final Object wc = new Object();
- private final Object stopLock = new Object();
- private final AtomicInteger readSourceErrorsCount;
- private volatile Thread sourceReaderThread;
- private volatile boolean stopped;
- private volatile int percentsAvailable = -1;
- public ProxyCache(Source source, Cache cache) {
- this.source = checkNotNull(source);
- this.cache = checkNotNull(cache);
- this.readSourceErrorsCount = new AtomicInteger();
- }
- public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
- ProxyCacheUtils.assertBuffer(buffer, offset, length);
- while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
- readSourceAsync();
- waitForSourceData();
- checkReadSourceErrorsCount();
- }
- int read = cache.read(buffer, offset, length);
- if (cache.isCompleted() && percentsAvailable != 100) {
- percentsAvailable = 100;
- onCachePercentsAvailableChanged(100);
- }
- return read;
- }
- private void checkReadSourceErrorsCount() throws ProxyCacheException {
- int errorsCount = readSourceErrorsCount.get();
- if (errorsCount >= MAX_READ_SOURCE_ATTEMPTS) {
- readSourceErrorsCount.set(0);
- throw new ProxyCacheException("Error reading source " + errorsCount + " times");
- }
- }
- public void shutdown() {
- synchronized (stopLock) {
- Logger.debug("Shutdown proxy for " + source);
- try {
- stopped = true;
- if (sourceReaderThread != null) {
- sourceReaderThread.interrupt();
- }
- cache.close();
- } catch (ProxyCacheException e) {
- onError(e);
- }
- }
- }
- private synchronized void readSourceAsync() throws ProxyCacheException {
- boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
- if (!stopped && !cache.isCompleted() && !readingInProgress) {
- sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
- sourceReaderThread.start();
- }
- }
- private void waitForSourceData() throws ProxyCacheException {
- synchronized (wc) {
- try {
- wc.wait(1000);
- } catch (InterruptedException e) {
- throw new ProxyCacheException("Waiting source data is interrupted!", e);
- }
- }
- }
- private void notifyNewCacheDataAvailable(long cacheAvailable, long sourceAvailable) {
- onCacheAvailable(cacheAvailable, sourceAvailable);
- synchronized (wc) {
- wc.notifyAll();
- }
- }
- protected void onCacheAvailable(long cacheAvailable, long sourceLength) {
- boolean zeroLengthSource = sourceLength == 0;
- int percents = zeroLengthSource ? 100 : (int) ((float) cacheAvailable / sourceLength * 100);
- boolean percentsChanged = percents != percentsAvailable;
- boolean sourceLengthKnown = sourceLength >= 0;
- if (sourceLengthKnown && percentsChanged) {
- onCachePercentsAvailableChanged(percents);
- }
- percentsAvailable = percents;
- }
- protected void onCachePercentsAvailableChanged(int percentsAvailable) {
- }
- private void readSource() {
- long sourceAvailable = -1;
- long offset = 0;
- try {
- offset = cache.available();
- source.open(offset);
- sourceAvailable = source.length();
- byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
- int readBytes;
- while ((readBytes = source.read(buffer)) != -1) {
- synchronized (stopLock) {
- if (isStopped()) {
- return;
- }
- cache.append(buffer, readBytes);
- }
- offset += readBytes;
- notifyNewCacheDataAvailable(offset, sourceAvailable);
- }
- tryComplete();
- onSourceRead();
- } catch (Throwable e) {
- readSourceErrorsCount.incrementAndGet();
- onError(e);
- } finally {
- closeSource();
- notifyNewCacheDataAvailable(offset, sourceAvailable);
- }
- }
- private void onSourceRead() {
- // guaranteed notify listeners after source read and cache completed
- percentsAvailable = 100;
- onCachePercentsAvailableChanged(percentsAvailable);
- }
- private void tryComplete() throws ProxyCacheException {
- synchronized (stopLock) {
- if (!isStopped() && cache.available() == source.length()) {
- cache.complete();
- }
- }
- }
- private boolean isStopped() {
- return Thread.currentThread().isInterrupted() || stopped;
- }
- private void closeSource() {
- try {
- source.close();
- } catch (ProxyCacheException e) {
- onError(new ProxyCacheException("Error closing source " + source, e));
- }
- }
- protected final void onError(final Throwable e) {
- boolean interruption = e instanceof InterruptedProxyCacheException;
- if (interruption) {
- Logger.debug("ProxyCache is interrupted");
- } else {
- Logger.error("ProxyCache error");
- }
- }
- private class SourceReaderRunnable implements Runnable {
- @Override
- public void run() {
- readSource();
- }
- }
- }
|