Implement caching (experimental).

This commit is contained in:
Ryan Heise 2021-01-17 01:31:31 +11:00
parent 47fde5cc74
commit 0fb6ff5cb1
3 changed files with 206 additions and 30 deletions

View File

@ -1,12 +1,15 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:math';
import 'package:audio_session/audio_session.dart';
import 'package:crypto/crypto.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart';
import 'package:flutter/widgets.dart';
import 'package:just_audio_platform_interface/just_audio_platform_interface.dart';
import 'package:meta/meta.dart' show experimental;
import 'package:path/path.dart' as p;
import 'package:path_provider/path_provider.dart';
import 'package:rxdart/rxdart.dart';
@ -925,6 +928,7 @@ class AudioPlayer {
/// otherwise.
Future<Duration> _setPlatformActive(bool active,
[Completer<void> playCompleter]) {
if (_disposed) return null;
if (active == _active) return _durationFuture;
// This method updates _active and _platform before yielding to the next
// task in the event loop.
@ -942,6 +946,7 @@ class AudioPlayer {
await _disposePlatform(oldPlatform);
}
}
if (_disposed) return null;
// During initialisation, we must only use this platform reference in case
// _platform is updated again during initialisation.
final platform = active
@ -1362,9 +1367,8 @@ class _ProxyHttpServer {
Future stop() => _server.close();
}
/// Encapsulates the start and end of an HTTP range request, along with the
/// total byte length of the available data in the resource.
class _HttpRange {
/// Encapsulates the start and end of an HTTP range request.
class _HttpRangeRequest {
/// The starting byte position of the range request.
final int start;
@ -1372,6 +1376,30 @@ class _HttpRange {
/// until the end of the media.
final int end;
/// The end byte position (exclusive), defaulting to `null`.
int get endEx => end == null ? null : end + 1;
_HttpRangeRequest(this.start, this.end);
/// Creates an [_HttpRange] from [header].
static _HttpRangeRequest parse(List<String> header) {
if (header == null || header.isEmpty) return null;
final match = RegExp(r'^bytes=(\d+)(-(\d+)?)?').firstMatch(header.first);
if (match == null) return null;
int intGroup(int i) => match[i] != null ? int.parse(match[i]) : null;
return _HttpRangeRequest(intGroup(1), intGroup(3));
}
}
/// Encapsulates the range information in an HTTP range response.
class _HttpRange {
/// The starting byte position of the range.
final int start;
/// The last byte position of the range, or `null` if until the end of the
/// media.
final int end;
/// The total number of bytes in the entire media.
final int fullLength;
@ -1386,15 +1414,6 @@ class _HttpRange {
/// The content-range header value to use in HTTP responses.
String get contentRangeHeader =>
'bytes $start-${end?.toString() ?? ""}/$fullLength';
/// Creates an [_HttpRange] from [header] and [fullLength].
static _HttpRange parse(List<String> header, int fullLength) {
if (header == null || header.isEmpty) return null;
final match = RegExp(r'^bytes=(\d+)(-(\d+)?)?').firstMatch(header.first);
if (match == null) return null;
int intGroup(int i) => match[i] != null ? int.parse(match[i]) : null;
return _HttpRange(intGroup(1), intGroup(3), fullLength);
}
}
/// Specifies a source of audio to be played. Audio sources are composable
@ -1523,8 +1542,7 @@ abstract class UriAudioSource extends IndexedAudioSource {
/// Get file for caching asset media with proper extension
Future<File> _getCacheFile(final String assetPath) async => File(p.joinAll([
(await getTemporaryDirectory()).path,
'just_audio_cache',
(await _getCacheDir()).path,
'assets',
...Uri.parse(assetPath).pathSegments,
]));
@ -1878,9 +1896,11 @@ class LoopingAudioSource extends AudioSource {
}
/// An [AudioSource] that provides audio dynamically. Subclasses must override
/// [read] and [lengthInBytes] to provide the encoded audio data.
/// [request] to provide the encoded audio data. This API is experimental.
@experimental
abstract class StreamAudioSource extends IndexedAudioSource {
Uri _uri;
@required
StreamAudioSource(tag) : super(tag);
@override
@ -1889,15 +1909,11 @@ abstract class StreamAudioSource extends IndexedAudioSource {
_uri = player._proxy.addStreamAudioSource(this);
}
/// Used by the player to read a byte range of encoded audio data in small
/// Used by the player to request a byte range of encoded audio data in small
/// chunks, from byte position [start] inclusive (or from the beginning of the
/// audio data if not specified) to [end] exclusive (or the end of the audio
/// data if not specified).
Stream<List<int>> read([int start, int end]);
/// Used by the player to determine the total number of bytes of data in this
/// audio source.
int get lengthInBytes;
Future<StreamAudioResponse> request([int start, int end]);
@override
bool get _requiresProxy => true;
@ -1907,27 +1923,182 @@ abstract class StreamAudioSource extends IndexedAudioSource {
id: _id, uri: _uri.toString(), headers: null);
}
/// The response for a [StreamAudioSource]. This API is experimental.
@experimental
class StreamAudioResponse {
/// The total number of bytes available.
final int sourceLength;
/// The number of bytes returned in this response.
final int contentLength;
/// The starting byte position of the response data.
final int offset;
/// The audio content returned by this response.
final Stream<List<int>> stream;
StreamAudioResponse({
@required this.sourceLength,
@required this.contentLength,
@required this.offset,
@required this.stream,
});
}
/// This is an experimental audio source that caches the audio while it is being
/// downloaded and played.
@experimental
class LockCachingAudioSource extends StreamAudioSource {
Future<HttpClientResponse> _response;
final Uri uri;
final Map headers;
final Future<File> _cacheFile;
int _progress = 0;
final _requests = <_StreamingByteRangeRequest>[];
LockCachingAudioSource(
this.uri, {
this.headers,
File cacheFile,
dynamic tag,
}) : _cacheFile =
cacheFile != null ? Future.value(cacheFile) : _getCacheFile(uri),
super(tag);
/// Get file for caching [uri] with proper extension
static Future<File> _getCacheFile(final Uri uri) async => File(p.joinAll([
(await _getCacheDir()).path,
'remote',
sha256.convert(utf8.encode(uri.toString())).toString() +
p.extension(uri.path),
]));
Future<File> get _partialCacheFile async =>
File('${(await _cacheFile).path}.part');
Future<HttpClientResponse> _fetch() async {
HttpClient httpClient = HttpClient();
final request = await httpClient.getUrl(uri);
if (headers != null) {
request.headers.clear();
headers.forEach((name, value) => request.headers.set(name, value));
}
final response = await request.close();
if (response.statusCode != 200) {
httpClient.close();
throw Exception('HTTP Status Error: ${response.statusCode}');
}
(await _partialCacheFile).createSync(recursive: true);
// TODO: Should close sink after done, but it throws an error.
// ignore: close_sinks
final sink = (await _partialCacheFile).openWrite();
var sourceLength = response.contentLength;
StreamSubscription subscription;
subscription = response.listen((data) {
_progress += data.length;
sink.add(data);
final readyRequests = _requests
.where((request) => request.end ?? sourceLength <= _progress)
.toList();
if (readyRequests.isEmpty) return;
sink.flush().then((_) async {
for (var request in readyRequests) {
_requests.remove(request);
final start = request.start ?? 0;
final end = request.end ?? sourceLength;
request.complete(StreamAudioResponse(
sourceLength: sourceLength,
contentLength: end - start,
offset: start,
stream: (await _effectiveCacheFile).openRead(start, end),
));
}
});
}, onDone: () async {
(await _partialCacheFile).renameSync((await _cacheFile).path);
await subscription.cancel();
httpClient.close();
}, onError: (e, stackTrace) async {
print(stackTrace);
(await _partialCacheFile).deleteSync();
httpClient.close();
});
return response;
}
Future<File> get _effectiveCacheFile async =>
(await _partialCacheFile).existsSync() ? _partialCacheFile : _cacheFile;
@override
Future<StreamAudioResponse> request([int start, int end]) async {
final cacheFile = await _cacheFile;
start ??= 0;
if (cacheFile.existsSync()) {
final sourceLength = cacheFile.lengthSync();
end ??= sourceLength;
return StreamAudioResponse(
sourceLength: sourceLength,
contentLength: end - start,
offset: start,
stream: cacheFile.openRead(start, end),
);
}
final byteRangeRequest = _StreamingByteRangeRequest(start, end);
_requests.add(byteRangeRequest);
if (_response == null) {
_response = _fetch();
} else {}
return byteRangeRequest.future;
}
}
/// Request parameters for a [StreamingAudioSource].
class _StreamingByteRangeRequest {
/// The start of the range request.
final int start;
/// The end of the range request.
final int end;
/// Completes when the response is available.
final _completer = Completer<StreamAudioResponse>();
_StreamingByteRangeRequest(this.start, this.end);
/// The response for this request.
Future<StreamAudioResponse> get future => _completer.future;
/// Completes this request with the given [response].
void complete(StreamAudioResponse response) {
_completer.complete(response);
}
}
/// The type of functions that can handle HTTP requests sent to the proxy.
typedef void _ProxyHandler(HttpRequest request);
/// A proxy handler for serving audio from a [StreamAudioSource].
_ProxyHandler _proxyHandlerForSource(StreamAudioSource source) {
Future<void> handler(HttpRequest request) async {
final range = _HttpRange.parse(
request.headers[HttpHeaders.rangeHeader], source.lengthInBytes);
final rangeRequest =
_HttpRangeRequest.parse(request.headers[HttpHeaders.rangeHeader]);
request.response.headers.clear();
request.response.headers.set(HttpHeaders.acceptRangesHeader, 'bytes');
request.response.statusCode = range == null ? 200 : 206;
final length = range?.length;
if (length != null) {
request.response.contentLength = length;
request.response.statusCode = rangeRequest == null ? 200 : 206;
final sourceResponse =
await source.request(rangeRequest?.start, rangeRequest?.endEx);
final range = _HttpRange(rangeRequest?.start ?? 0, rangeRequest?.end,
sourceResponse.sourceLength);
request.response.contentLength = range.length;
if (rangeRequest != null) {
request.response.headers
.set(HttpHeaders.contentRangeHeader, range.contentRangeHeader);
}
// Pipe response
await source.read(range?.start ?? 0, range?.endEx).pipe(request.response);
await sourceResponse.stream.pipe(request.response);
await request.response.close();
}
@ -2010,6 +2181,9 @@ _ProxyHandler _proxyHandlerForUri(Uri uri, Map headers) {
return handler;
}
Future<Directory> _getCacheDir() async =>
Directory(p.join((await getTemporaryDirectory()).path, 'just_audio_cache'));
/// Defines the algorithm for shuffling the order of a
/// [ConcatenatingAudioSource]. See [DefaultShuffleOrder] for a default
/// implementation.

View File

@ -114,7 +114,7 @@ packages:
source: hosted
version: "2.1.1"
crypto:
dependency: transitive
dependency: "direct main"
description:
name: crypto
url: "https://pub.dartlang.org"
@ -220,7 +220,7 @@ packages:
source: hosted
version: "0.12.10-nullsafety.1"
meta:
dependency: transitive
dependency: "direct main"
description:
name: meta
url: "https://pub.dartlang.org"

View File

@ -16,6 +16,8 @@ dependencies:
path_provider: ^1.6.10
async: ^2.4.0
uuid: ^2.2.0
crypto: ^2.1.5
meta: ^1.2.4
flutter:
sdk: flutter