diff --git a/just_audio/lib/just_audio.dart b/just_audio/lib/just_audio.dart index 3a16f23..68cdb50 100644 --- a/just_audio/lib/just_audio.dart +++ b/just_audio/lib/just_audio.dart @@ -7,8 +7,6 @@ import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:flutter/widgets.dart'; import 'package:just_audio_platform_interface/just_audio_platform_interface.dart'; -import 'package:path/path.dart' as p; -import 'package:path_provider/path_provider.dart'; import 'package:rxdart/rxdart.dart'; import 'package:uuid/uuid.dart'; @@ -604,7 +602,7 @@ class AudioPlayer { Future _load(AudioPlayerPlatform platform, AudioSource source, {_InitialSeekValues initialSeekValues}) async { try { - if (!kIsWeb && source._requiresHeaders) { + if (!kIsWeb && source._requiresProxy) { if (_proxy == null) { _proxy = _ProxyHttpServer(); await _proxy.start(); @@ -1291,25 +1289,43 @@ class SequenceState { class _ProxyHttpServer { HttpServer _server; - /// Maps request keys to [_ProxyRequest]s. - final Map _uriMap = {}; + /// Maps request keys to [_ProxyHandler]s. + final Map _handlerMap = {}; /// The port this server is bound to on localhost. This is set only after /// [start] has completed. int get port => _server.port; - /// Associate headers with a URL. This may be called only after [start] has - /// completed. - Uri addUrl(Uri url, Map headers) { - final path = _requestKey(url); - _uriMap[path] = _ProxyRequest(url, headers); - return url.replace( + /// Register a [UriAudioSource] to be served through this proxy. This may be + /// called only after [start] has completed. + Uri addUriAudioSource(UriAudioSource source) { + final uri = source.uri; + final headers = source.headers?.cast(); + final path = _requestKey(uri); + if (uri.scheme == 'asset') { + _handlerMap[path] = _proxyHandlerForAsset(uri); + } else { + _handlerMap[path] = _proxyHandlerForUri(uri, headers); + } + return uri.replace( scheme: 'http', host: InternetAddress.loopbackIPv4.address, port: port, ); } + /// Register a [StreamAudioSource] to be served through this proxy. This may + /// be called only after [start] has completed. + Uri addStreamAudioSource(StreamAudioSource source) { + final uri = _sourceUri(source); + final path = _requestKey(uri); + _handlerMap[path] = _proxyHandlerForSource(source); + return uri; + } + + Uri _sourceUri(StreamAudioSource source) => Uri.http( + '${InternetAddress.loopbackIPv4.address}:$port', '/id/${source._id}'); + /// A unique key for each request that can be processed by this proxy, /// made up of the URL path and query string. It is not possible to /// simultaneously track requests that have the same URL path and query @@ -1321,75 +1337,9 @@ class _ProxyHttpServer { _server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); _server.listen((request) async { if (request.method == 'GET') { - final path = _requestKey(request.uri); - final proxyRequest = _uriMap[path]; - final originRequest = await HttpClient().getUrl(proxyRequest.uri); - - // Rewrite request headers - final host = originRequest.headers.value('host'); - originRequest.headers.clear(); - request.headers.forEach((name, value) { - originRequest.headers.set(name, value); - }); - for (var name in proxyRequest.headers.keys) { - originRequest.headers.set(name, proxyRequest.headers[name]); - } - originRequest.headers.set('host', host); - - // Try to make normal request - try { - final originResponse = await originRequest.close(); - - request.response.headers.clear(); - originResponse.headers.forEach((name, value) { - request.response.headers.set(name, value); - }); - request.response.statusCode = originResponse.statusCode; - - // Pipe response - await originResponse.pipe(request.response); - await request.response.close(); - } on HttpException { - // We likely are dealing with a streaming protocol - if (proxyRequest.uri.scheme == 'http') { - // Try parsing HTTP 0.9 response - //request.response.headers.clear(); - final socket = await Socket.connect( - proxyRequest.uri.host, proxyRequest.uri.port); - final clientSocket = - await request.response.detachSocket(writeHeaders: false); - Completer done = Completer(); - socket.listen( - clientSocket.add, - onDone: () async { - await clientSocket.flush(); - socket.close(); - clientSocket.close(); - done.complete(); - }, - ); - // Rewrite headers - final headers = {}; - request.headers.forEach((name, value) { - if (name.toLowerCase() != 'host') { - headers[name] = value.join(","); - } - }); - for (var name in proxyRequest.headers.keys) { - headers[name] = proxyRequest.headers[name]; - } - socket.write("GET ${proxyRequest.uri.path} HTTP/1.1\n"); - if (host != null) { - socket.write("Host: $host\n"); - } - for (var name in headers.keys) { - socket.write("$name: ${headers[name]}\n"); - } - socket.write("\n"); - await socket.flush(); - await done.future; - } - } + final uriPath = _requestKey(request.uri); + final handler = _handlerMap[uriPath]; + handler(request); } }); } @@ -1398,12 +1348,39 @@ class _ProxyHttpServer { Future stop() => _server.close(); } -/// A request for a URL and headers made by a [_ProxyHttpServer]. -class _ProxyRequest { - final Uri uri; - final Map headers; +/// Encapsulates the start and end of an HTTP range request, along with the +/// total byte length of the available data in the resource. +class _HttpRange { + /// The starting byte position of the range request. + final int start; - _ProxyRequest(this.uri, this.headers); + /// The last byte position of the range request, or `null` if requesting + /// until the end of the media. + final int end; + + /// The total number of bytes in the entire media. + final int fullLength; + + _HttpRange(this.start, this.end, this.fullLength); + + /// The end byte position (exclusive), defaulting to [fullLength]. + int get endEx => end == null ? fullLength : end + 1; + + /// The number of bytes requested. + int get length => endEx == null ? null : endEx - start; + + /// The content-range header value to use in HTTP responses. + String get contentRangeHeader => + 'bytes $start-${end?.toString() ?? ""}/$fullLength'; + + /// Creates an [_HttpRange] from [header] and [fullLength]. + static _HttpRange parse(List header, int fullLength) { + if (header == null || header.isEmpty) return null; + final match = RegExp(r'^bytes=(\d+)(-(\d+)?)?').firstMatch(header.first); + if (match == null) return null; + int intGroup(int i) => match[i] != null ? int.parse(match[i]) : null; + return _HttpRange(intGroup(1), intGroup(3), fullLength); + } } /// Specifies a source of audio to be played. Audio sources are composable @@ -1453,7 +1430,7 @@ abstract class AudioSource { AudioSourceMessage _toMessage(); - bool get _requiresHeaders; + bool get _requiresProxy; List get sequence; @@ -1501,11 +1478,8 @@ abstract class UriAudioSource extends IndexedAudioSource { @override Future _setup(AudioPlayer player) async { await super._setup(player); - if (uri.scheme == 'asset') { - _overrideUri = Uri.file( - (await _loadAsset(uri.path.replaceFirst(RegExp(r'^/'), ''))).path); - } else if (headers != null) { - _overrideUri = player._proxy.addUrl(uri, headers.cast()); + if (uri.scheme == 'asset' || headers != null) { + _overrideUri = player._proxy.addUriAudioSource(this); } } @@ -1517,25 +1491,8 @@ abstract class UriAudioSource extends IndexedAudioSource { super._dispose(); } - Future _loadAsset(String assetPath) async { - final file = await _getCacheFile(assetPath); - this._cacheFile = file; - if (!file.existsSync()) { - await file.create(recursive: true); - await file.writeAsBytes( - (await rootBundle.load(assetPath)).buffer.asUint8List()); - } - return file; - } - - /// Get file for caching asset media with proper extension - Future _getCacheFile(final String assetPath) async => File(p.join( - (await getTemporaryDirectory()).path, - 'just_audio_asset_cache', - '${_player._id}_$_id${p.extension(assetPath)}')); - @override - bool get _requiresHeaders => headers != null; + bool get _requiresProxy => headers != null || uri.scheme == 'asset'; } /// An [AudioSource] representing a regular media file such as an MP3 or M4A @@ -1804,8 +1761,7 @@ class ConcatenatingAudioSource extends AudioSource { } @override - bool get _requiresHeaders => - children.any((source) => source._requiresHeaders); + bool get _requiresProxy => children.any((source) => source._requiresProxy); @override AudioSourceMessage _toMessage() => ConcatenatingAudioSourceMessage( @@ -1840,7 +1796,7 @@ class ClippingAudioSource extends IndexedAudioSource { } @override - bool get _requiresHeaders => child._requiresHeaders; + bool get _requiresProxy => child._requiresProxy; @override AudioSourceMessage _toMessage() => ClippingAudioSourceMessage( @@ -1876,13 +1832,195 @@ class LoopingAudioSource extends AudioSource { List get shuffleIndices => List.generate(count, (i) => i); @override - bool get _requiresHeaders => child._requiresHeaders; + bool get _requiresProxy => child._requiresProxy; @override AudioSourceMessage _toMessage() => LoopingAudioSourceMessage( id: _id, child: child._toMessage(), count: count); } +/// An [AudioSource] that provides audio dynamically. Subclasses must override +/// [read] and [lengthInBytes] to provide the encoded audio data. +abstract class StreamAudioSource extends IndexedAudioSource { + Uri _uri; + StreamAudioSource(tag) : super(tag); + + @override + Future _setup(AudioPlayer player) async { + await super._setup(player); + _uri = player._proxy.addStreamAudioSource(this); + } + + /// Used by the player to read a byte range of encoded audio data in small + /// chunks, from byte position [start] inclusive (or from the beginning of the + /// audio data if not specified) to [end] exclusive (or the end of the audio + /// data if not specified). + Stream> read([int start, int end]); + + /// Used by the player to determine the total number of bytes of data in this + /// audio source. + int get lengthInBytes; + + @override + bool get _requiresProxy => true; + + @override + AudioSourceMessage _toMessage() => ProgressiveAudioSourceMessage( + id: _id, uri: _uri.toString(), headers: null); +} + +/// An asset cache that holds loaded assets in memory for 10 seconds. +class _AssetCache { + static final _assets = >{}; + static final _timers = {}; + + static Future load(String path) async { + var asset = _assets[path]; + if (asset == null) { + _assets[path] = asset = rootBundle.load(path); + } else { + _timers[path].cancel(); + } + _timers[path] = Timer(Duration(seconds: 10), () { + _assets.remove(path); + _timers.remove(path); + }); + return asset; + } +} + +/// The type of functions that can handle HTTP requests sent to the proxy. +typedef void _ProxyHandler(HttpRequest request); + +/// A proxy handler for serving assets. +_ProxyHandler _proxyHandlerForAsset(Uri assetUri) { + Future handler(HttpRequest request) async { + final assetPath = assetUri.path.replaceFirst(RegExp(r'^/'), ''); + // This would be better if Flutter provided a stream-based API to load + // assets. + final byteData = await _AssetCache.load(assetPath); + + final range = _HttpRange.parse( + request.headers[HttpHeaders.rangeHeader], byteData.lengthInBytes); + + request.response.headers.clear(); + request.response.headers.set(HttpHeaders.acceptRangesHeader, 'bytes'); + request.response.statusCode = range == null ? 200 : 206; + final length = range?.length ?? byteData.lengthInBytes; + request.response.contentLength = length; + if (range != null) { + request.response.headers + .set(HttpHeaders.contentRangeHeader, range.contentRangeHeader); + } + + // Write response + final bytes = byteData.buffer.asUint8List(range?.start ?? 0, length); + request.response.add(bytes); + await request.response.flush(); + await request.response.close(); + } + + return handler; +} + +/// A proxy handler for serving audio from a [StreamAudioSource]. +_ProxyHandler _proxyHandlerForSource(StreamAudioSource source) { + Future handler(HttpRequest request) async { + final range = _HttpRange.parse( + request.headers[HttpHeaders.rangeHeader], source.lengthInBytes); + + request.response.headers.clear(); + request.response.headers.set(HttpHeaders.acceptRangesHeader, 'bytes'); + request.response.statusCode = range == null ? 200 : 206; + final length = range?.length; + if (length != null) { + request.response.contentLength = length; + request.response.headers + .set(HttpHeaders.contentRangeHeader, range.contentRangeHeader); + } + + // Pipe response + await source.read(range?.start ?? 0, range?.endEx).pipe(request.response); + await request.response.close(); + } + + return handler; +} + +/// A proxy handler for serving audio from a URI with optional headers. +_ProxyHandler _proxyHandlerForUri(Uri uri, Map headers) { + Future handler(HttpRequest request) async { + final originRequest = await HttpClient().getUrl(uri); + + // Rewrite request headers + final host = originRequest.headers.value('host'); + originRequest.headers.clear(); + request.headers.forEach((name, value) { + originRequest.headers.set(name, value); + }); + for (var name in headers.keys) { + originRequest.headers.set(name, headers[name]); + } + originRequest.headers.set('host', host); + + // Try to make normal request + try { + final originResponse = await originRequest.close(); + + request.response.headers.clear(); + originResponse.headers.forEach((name, value) { + request.response.headers.set(name, value); + }); + request.response.statusCode = originResponse.statusCode; + + // Pipe response + await originResponse.pipe(request.response); + await request.response.close(); + } on HttpException { + // We likely are dealing with a streaming protocol + if (uri.scheme == 'http') { + // Try parsing HTTP 0.9 response + //request.response.headers.clear(); + final socket = await Socket.connect(uri.host, uri.port); + final clientSocket = + await request.response.detachSocket(writeHeaders: false); + Completer done = Completer(); + socket.listen( + clientSocket.add, + onDone: () async { + await clientSocket.flush(); + socket.close(); + clientSocket.close(); + done.complete(); + }, + ); + // Rewrite headers + final headers = {}; + request.headers.forEach((name, value) { + if (name.toLowerCase() != 'host') { + headers[name] = value.join(","); + } + }); + for (var name in headers.keys) { + headers[name] = headers[name]; + } + socket.write("GET ${uri.path} HTTP/1.1\n"); + if (host != null) { + socket.write("Host: $host\n"); + } + for (var name in headers.keys) { + socket.write("$name: ${headers[name]}\n"); + } + socket.write("\n"); + await socket.flush(); + await done.future; + } + } + } + + return handler; +} + /// Defines the algorithm for shuffling the order of a /// [ConcatenatingAudioSource]. See [DefaultShuffleOrder] for a default /// implementation. @@ -1965,8 +2103,11 @@ class DefaultShuffleOrder extends ShuffleOrder { } } +/// An enumeration of modes that can be passed to [AudioPlayer.setLoopMode]. enum LoopMode { off, one, all } +/// The stand-in platform implementation to use when the player is in the idle +/// state and the native platform is deallocated. class _IdleAudioPlayer extends AudioPlayerPlatform { final _eventSubject = BehaviorSubject(); Duration _position;