Serve assets from proxy, add StreamAudioSource.

This commit is contained in:
Ryan Heise 2021-01-07 20:49:15 +11:00
parent 3d5eaf7a5f
commit 2779ae71b4
1 changed files with 254 additions and 113 deletions

View File

@ -7,8 +7,6 @@ import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart';
import 'package:flutter/widgets.dart';
import 'package:just_audio_platform_interface/just_audio_platform_interface.dart';
import 'package:path/path.dart' as p;
import 'package:path_provider/path_provider.dart';
import 'package:rxdart/rxdart.dart';
import 'package:uuid/uuid.dart';
@ -604,7 +602,7 @@ class AudioPlayer {
Future<Duration> _load(AudioPlayerPlatform platform, AudioSource source,
{_InitialSeekValues initialSeekValues}) async {
try {
if (!kIsWeb && source._requiresHeaders) {
if (!kIsWeb && source._requiresProxy) {
if (_proxy == null) {
_proxy = _ProxyHttpServer();
await _proxy.start();
@ -1291,25 +1289,43 @@ class SequenceState {
class _ProxyHttpServer {
HttpServer _server;
/// Maps request keys to [_ProxyRequest]s.
final Map<String, _ProxyRequest> _uriMap = {};
/// Maps request keys to [_ProxyHandler]s.
final Map<String, _ProxyHandler> _handlerMap = {};
/// The port this server is bound to on localhost. This is set only after
/// [start] has completed.
int get port => _server.port;
/// Associate headers with a URL. This may be called only after [start] has
/// completed.
Uri addUrl(Uri url, Map<String, String> headers) {
final path = _requestKey(url);
_uriMap[path] = _ProxyRequest(url, headers);
return url.replace(
/// Register a [UriAudioSource] to be served through this proxy. This may be
/// called only after [start] has completed.
Uri addUriAudioSource(UriAudioSource source) {
final uri = source.uri;
final headers = source.headers?.cast<String, String>();
final path = _requestKey(uri);
if (uri.scheme == 'asset') {
_handlerMap[path] = _proxyHandlerForAsset(uri);
} else {
_handlerMap[path] = _proxyHandlerForUri(uri, headers);
}
return uri.replace(
scheme: 'http',
host: InternetAddress.loopbackIPv4.address,
port: port,
);
}
/// Register a [StreamAudioSource] to be served through this proxy. This may
/// be called only after [start] has completed.
Uri addStreamAudioSource(StreamAudioSource source) {
final uri = _sourceUri(source);
final path = _requestKey(uri);
_handlerMap[path] = _proxyHandlerForSource(source);
return uri;
}
Uri _sourceUri(StreamAudioSource source) => Uri.http(
'${InternetAddress.loopbackIPv4.address}:$port', '/id/${source._id}');
/// A unique key for each request that can be processed by this proxy,
/// made up of the URL path and query string. It is not possible to
/// simultaneously track requests that have the same URL path and query
@ -1321,75 +1337,9 @@ class _ProxyHttpServer {
_server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
_server.listen((request) async {
if (request.method == 'GET') {
final path = _requestKey(request.uri);
final proxyRequest = _uriMap[path];
final originRequest = await HttpClient().getUrl(proxyRequest.uri);
// Rewrite request headers
final host = originRequest.headers.value('host');
originRequest.headers.clear();
request.headers.forEach((name, value) {
originRequest.headers.set(name, value);
});
for (var name in proxyRequest.headers.keys) {
originRequest.headers.set(name, proxyRequest.headers[name]);
}
originRequest.headers.set('host', host);
// Try to make normal request
try {
final originResponse = await originRequest.close();
request.response.headers.clear();
originResponse.headers.forEach((name, value) {
request.response.headers.set(name, value);
});
request.response.statusCode = originResponse.statusCode;
// Pipe response
await originResponse.pipe(request.response);
await request.response.close();
} on HttpException {
// We likely are dealing with a streaming protocol
if (proxyRequest.uri.scheme == 'http') {
// Try parsing HTTP 0.9 response
//request.response.headers.clear();
final socket = await Socket.connect(
proxyRequest.uri.host, proxyRequest.uri.port);
final clientSocket =
await request.response.detachSocket(writeHeaders: false);
Completer done = Completer();
socket.listen(
clientSocket.add,
onDone: () async {
await clientSocket.flush();
socket.close();
clientSocket.close();
done.complete();
},
);
// Rewrite headers
final headers = <String, String>{};
request.headers.forEach((name, value) {
if (name.toLowerCase() != 'host') {
headers[name] = value.join(",");
}
});
for (var name in proxyRequest.headers.keys) {
headers[name] = proxyRequest.headers[name];
}
socket.write("GET ${proxyRequest.uri.path} HTTP/1.1\n");
if (host != null) {
socket.write("Host: $host\n");
}
for (var name in headers.keys) {
socket.write("$name: ${headers[name]}\n");
}
socket.write("\n");
await socket.flush();
await done.future;
}
}
final uriPath = _requestKey(request.uri);
final handler = _handlerMap[uriPath];
handler(request);
}
});
}
@ -1398,12 +1348,39 @@ class _ProxyHttpServer {
Future stop() => _server.close();
}
/// A request for a URL and headers made by a [_ProxyHttpServer].
class _ProxyRequest {
final Uri uri;
final Map<String, String> headers;
/// Encapsulates the start and end of an HTTP range request, along with the
/// total byte length of the available data in the resource.
class _HttpRange {
/// The starting byte position of the range request.
final int start;
_ProxyRequest(this.uri, this.headers);
/// The last byte position of the range request, or `null` if requesting
/// until the end of the media.
final int end;
/// The total number of bytes in the entire media.
final int fullLength;
_HttpRange(this.start, this.end, this.fullLength);
/// The end byte position (exclusive), defaulting to [fullLength].
int get endEx => end == null ? fullLength : end + 1;
/// The number of bytes requested.
int get length => endEx == null ? null : endEx - start;
/// The content-range header value to use in HTTP responses.
String get contentRangeHeader =>
'bytes $start-${end?.toString() ?? ""}/$fullLength';
/// Creates an [_HttpRange] from [header] and [fullLength].
static _HttpRange parse(List<String> header, int fullLength) {
if (header == null || header.isEmpty) return null;
final match = RegExp(r'^bytes=(\d+)(-(\d+)?)?').firstMatch(header.first);
if (match == null) return null;
int intGroup(int i) => match[i] != null ? int.parse(match[i]) : null;
return _HttpRange(intGroup(1), intGroup(3), fullLength);
}
}
/// Specifies a source of audio to be played. Audio sources are composable
@ -1453,7 +1430,7 @@ abstract class AudioSource {
AudioSourceMessage _toMessage();
bool get _requiresHeaders;
bool get _requiresProxy;
List<IndexedAudioSource> get sequence;
@ -1501,11 +1478,8 @@ abstract class UriAudioSource extends IndexedAudioSource {
@override
Future<void> _setup(AudioPlayer player) async {
await super._setup(player);
if (uri.scheme == 'asset') {
_overrideUri = Uri.file(
(await _loadAsset(uri.path.replaceFirst(RegExp(r'^/'), ''))).path);
} else if (headers != null) {
_overrideUri = player._proxy.addUrl(uri, headers.cast<String, String>());
if (uri.scheme == 'asset' || headers != null) {
_overrideUri = player._proxy.addUriAudioSource(this);
}
}
@ -1517,25 +1491,8 @@ abstract class UriAudioSource extends IndexedAudioSource {
super._dispose();
}
Future<File> _loadAsset(String assetPath) async {
final file = await _getCacheFile(assetPath);
this._cacheFile = file;
if (!file.existsSync()) {
await file.create(recursive: true);
await file.writeAsBytes(
(await rootBundle.load(assetPath)).buffer.asUint8List());
}
return file;
}
/// Get file for caching asset media with proper extension
Future<File> _getCacheFile(final String assetPath) async => File(p.join(
(await getTemporaryDirectory()).path,
'just_audio_asset_cache',
'${_player._id}_$_id${p.extension(assetPath)}'));
@override
bool get _requiresHeaders => headers != null;
bool get _requiresProxy => headers != null || uri.scheme == 'asset';
}
/// An [AudioSource] representing a regular media file such as an MP3 or M4A
@ -1804,8 +1761,7 @@ class ConcatenatingAudioSource extends AudioSource {
}
@override
bool get _requiresHeaders =>
children.any((source) => source._requiresHeaders);
bool get _requiresProxy => children.any((source) => source._requiresProxy);
@override
AudioSourceMessage _toMessage() => ConcatenatingAudioSourceMessage(
@ -1840,7 +1796,7 @@ class ClippingAudioSource extends IndexedAudioSource {
}
@override
bool get _requiresHeaders => child._requiresHeaders;
bool get _requiresProxy => child._requiresProxy;
@override
AudioSourceMessage _toMessage() => ClippingAudioSourceMessage(
@ -1876,13 +1832,195 @@ class LoopingAudioSource extends AudioSource {
List<int> get shuffleIndices => List.generate(count, (i) => i);
@override
bool get _requiresHeaders => child._requiresHeaders;
bool get _requiresProxy => child._requiresProxy;
@override
AudioSourceMessage _toMessage() => LoopingAudioSourceMessage(
id: _id, child: child._toMessage(), count: count);
}
/// An [AudioSource] that provides audio dynamically. Subclasses must override
/// [read] and [lengthInBytes] to provide the encoded audio data.
abstract class StreamAudioSource extends IndexedAudioSource {
Uri _uri;
StreamAudioSource(tag) : super(tag);
@override
Future<void> _setup(AudioPlayer player) async {
await super._setup(player);
_uri = player._proxy.addStreamAudioSource(this);
}
/// Used by the player to read a byte range of encoded audio data in small
/// chunks, from byte position [start] inclusive (or from the beginning of the
/// audio data if not specified) to [end] exclusive (or the end of the audio
/// data if not specified).
Stream<List<int>> read([int start, int end]);
/// Used by the player to determine the total number of bytes of data in this
/// audio source.
int get lengthInBytes;
@override
bool get _requiresProxy => true;
@override
AudioSourceMessage _toMessage() => ProgressiveAudioSourceMessage(
id: _id, uri: _uri.toString(), headers: null);
}
/// An asset cache that holds loaded assets in memory for 10 seconds.
class _AssetCache {
static final _assets = <String, Future<ByteData>>{};
static final _timers = <String, Timer>{};
static Future<ByteData> load(String path) async {
var asset = _assets[path];
if (asset == null) {
_assets[path] = asset = rootBundle.load(path);
} else {
_timers[path].cancel();
}
_timers[path] = Timer(Duration(seconds: 10), () {
_assets.remove(path);
_timers.remove(path);
});
return asset;
}
}
/// The type of functions that can handle HTTP requests sent to the proxy.
typedef void _ProxyHandler(HttpRequest request);
/// A proxy handler for serving assets.
_ProxyHandler _proxyHandlerForAsset(Uri assetUri) {
Future<void> handler(HttpRequest request) async {
final assetPath = assetUri.path.replaceFirst(RegExp(r'^/'), '');
// This would be better if Flutter provided a stream-based API to load
// assets.
final byteData = await _AssetCache.load(assetPath);
final range = _HttpRange.parse(
request.headers[HttpHeaders.rangeHeader], byteData.lengthInBytes);
request.response.headers.clear();
request.response.headers.set(HttpHeaders.acceptRangesHeader, 'bytes');
request.response.statusCode = range == null ? 200 : 206;
final length = range?.length ?? byteData.lengthInBytes;
request.response.contentLength = length;
if (range != null) {
request.response.headers
.set(HttpHeaders.contentRangeHeader, range.contentRangeHeader);
}
// Write response
final bytes = byteData.buffer.asUint8List(range?.start ?? 0, length);
request.response.add(bytes);
await request.response.flush();
await request.response.close();
}
return handler;
}
/// A proxy handler for serving audio from a [StreamAudioSource].
_ProxyHandler _proxyHandlerForSource(StreamAudioSource source) {
Future<void> handler(HttpRequest request) async {
final range = _HttpRange.parse(
request.headers[HttpHeaders.rangeHeader], source.lengthInBytes);
request.response.headers.clear();
request.response.headers.set(HttpHeaders.acceptRangesHeader, 'bytes');
request.response.statusCode = range == null ? 200 : 206;
final length = range?.length;
if (length != null) {
request.response.contentLength = length;
request.response.headers
.set(HttpHeaders.contentRangeHeader, range.contentRangeHeader);
}
// Pipe response
await source.read(range?.start ?? 0, range?.endEx).pipe(request.response);
await request.response.close();
}
return handler;
}
/// A proxy handler for serving audio from a URI with optional headers.
_ProxyHandler _proxyHandlerForUri(Uri uri, Map headers) {
Future<void> handler(HttpRequest request) async {
final originRequest = await HttpClient().getUrl(uri);
// Rewrite request headers
final host = originRequest.headers.value('host');
originRequest.headers.clear();
request.headers.forEach((name, value) {
originRequest.headers.set(name, value);
});
for (var name in headers.keys) {
originRequest.headers.set(name, headers[name]);
}
originRequest.headers.set('host', host);
// Try to make normal request
try {
final originResponse = await originRequest.close();
request.response.headers.clear();
originResponse.headers.forEach((name, value) {
request.response.headers.set(name, value);
});
request.response.statusCode = originResponse.statusCode;
// Pipe response
await originResponse.pipe(request.response);
await request.response.close();
} on HttpException {
// We likely are dealing with a streaming protocol
if (uri.scheme == 'http') {
// Try parsing HTTP 0.9 response
//request.response.headers.clear();
final socket = await Socket.connect(uri.host, uri.port);
final clientSocket =
await request.response.detachSocket(writeHeaders: false);
Completer done = Completer();
socket.listen(
clientSocket.add,
onDone: () async {
await clientSocket.flush();
socket.close();
clientSocket.close();
done.complete();
},
);
// Rewrite headers
final headers = <String, String>{};
request.headers.forEach((name, value) {
if (name.toLowerCase() != 'host') {
headers[name] = value.join(",");
}
});
for (var name in headers.keys) {
headers[name] = headers[name];
}
socket.write("GET ${uri.path} HTTP/1.1\n");
if (host != null) {
socket.write("Host: $host\n");
}
for (var name in headers.keys) {
socket.write("$name: ${headers[name]}\n");
}
socket.write("\n");
await socket.flush();
await done.future;
}
}
}
return handler;
}
/// Defines the algorithm for shuffling the order of a
/// [ConcatenatingAudioSource]. See [DefaultShuffleOrder] for a default
/// implementation.
@ -1965,8 +2103,11 @@ class DefaultShuffleOrder extends ShuffleOrder {
}
}
/// An enumeration of modes that can be passed to [AudioPlayer.setLoopMode].
enum LoopMode { off, one, all }
/// The stand-in platform implementation to use when the player is in the idle
/// state and the native platform is deallocated.
class _IdleAudioPlayer extends AudioPlayerPlatform {
final _eventSubject = BehaviorSubject<PlaybackEventMessage>();
Duration _position;