add daemon input sink

pull/2/head
fuwa 5 years ago
parent 37f2492518
commit 35710f9757

@ -30,7 +30,7 @@ import 'helper.dart';
import '../config.dart' as config; import '../config.dart' as config;
import '../logging.dart'; import '../logging.dart';
Stream<String> runBinary (String name) async* { Stream<String> runBinary (String name, {Stream<String> input}) async* {
final newPath = await getBinaryPath(name); final newPath = await getBinaryPath(name);
final appDocDir = await getApplicationDocumentsDirectory(); final appDocDir = await getApplicationDocumentsDirectory();
@ -60,6 +60,9 @@ Stream<String> runBinary (String name) async* {
log.info('args: ' + args.toString()); log.info('args: ' + args.toString());
final outputProcess = await Process.start(newPath, args); final outputProcess = await Process.start(newPath, args);
if (input != null) {
outputProcess.stdin.addStream(input.transform(utf8.encoder));
}
await for (final line in outputProcess.stdout.transform(utf8.decoder)) { await for (final line in outputProcess.stdout.transform(utf8.decoder)) {
yield line; yield line;
} }

@ -123,9 +123,10 @@ class _CyberWOW_PageState extends State<CyberWOW_Page> with WidgetsBindingObserv
SyncingState _syncingState = await _loadingState.next(loading, ''); SyncingState _syncingState = await _loadingState.next(loading, '');
final syncing = runBinary(binName).asBroadcastStream(); final StreamController<String> inputStreamController = StreamController();
final syncing = runBinary(binName, input: inputStreamController.stream).asBroadcastStream();
SyncedState _syncedState = await _syncingState.next(syncing); SyncedState _syncedState = await _syncingState.next(inputStreamController.sink, syncing);
await _syncedState.next(); await _syncedState.next();
var validState = true; var validState = true;

@ -156,7 +156,7 @@ class SyncingState extends HookedState {
syncState(); syncState();
} }
Future<SyncedState> next(Stream<String> processOutput) async { Future<SyncedState> next(StreamSink<String> processInput, Stream<String> processOutput) async {
log.fine("Syncing next"); log.fine("Syncing next");
Future<void> printStdout() async { Future<void> printStdout() async {
@ -191,7 +191,7 @@ class SyncingState extends HookedState {
log.fine('syncing: loop exit'); log.fine('syncing: loop exit');
final _height = await rpc.height(); final _height = await rpc.height();
SyncedState _next = SyncedState(setState, getNotification, stdout, processOutput); SyncedState _next = SyncedState(setState, getNotification, stdout, processInput, processOutput);
_next.height = _height; _next.height = _height;
return moveState(_next); return moveState(_next);
} }
@ -200,6 +200,7 @@ class SyncingState extends HookedState {
class SyncedState extends HookedState { class SyncedState extends HookedState {
Queue<String> stdout; Queue<String> stdout;
int height; int height;
StreamSink<String> processInput;
Stream<String> processOutput; Stream<String> processOutput;
bool synced = true; bool synced = true;
bool connected = true; bool connected = true;
@ -208,7 +209,7 @@ class SyncedState extends HookedState {
List<dynamic> getConnections = []; List<dynamic> getConnections = [];
List<dynamic> getTransactionPool = []; List<dynamic> getTransactionPool = [];
SyncedState(f, s, this.stdout, this.processOutput) : super (f, s); SyncedState(f, s, this.stdout, this.processInput, this.processOutput) : super (f, s);
Future<ReSyncingState> next() async { Future<ReSyncingState> next() async {
log.fine("Synced next"); log.fine("Synced next");
@ -240,6 +241,8 @@ class SyncedState extends HookedState {
getTransactionPool = await rpc.getTransactionPoolSimple(); getTransactionPool = await rpc.getTransactionPoolSimple();
// processInput.add('help\n');
// log.fine('getTransactionPool: $getTransactionPool'); // log.fine('getTransactionPool: $getTransactionPool');
syncState(); syncState();
} }
@ -249,7 +252,7 @@ class SyncedState extends HookedState {
log.fine('synced: loop exit'); log.fine('synced: loop exit');
ReSyncingState _next = ReSyncingState(setState, getNotification, stdout, processOutput); ReSyncingState _next = ReSyncingState(setState, getNotification, stdout, processInput, processOutput);
return moveState(_next); return moveState(_next);
} }
} }
@ -257,10 +260,11 @@ class SyncedState extends HookedState {
class ReSyncingState extends HookedState { class ReSyncingState extends HookedState {
Queue<String> stdout; Queue<String> stdout;
StreamSink<String> processInput;
Stream<String> processOutput; Stream<String> processOutput;
bool synced = false; bool synced = false;
ReSyncingState(f, s, this.stdout, this.processOutput) : super (f, s); ReSyncingState(f, s, this.stdout, this.processInput, this.processOutput) : super (f, s);
void append(String msg) { void append(String msg) {
stdout.addLast(msg); stdout.addLast(msg);
@ -297,7 +301,7 @@ class ReSyncingState extends HookedState {
await checkSync(); await checkSync();
log.fine('resync: await exit'); log.fine('resync: await exit');
SyncedState _next = SyncedState(setState, getNotification, stdout, processOutput); SyncedState _next = SyncedState(setState, getNotification, stdout, processInput, processOutput);
_next.height = await rpc.height(); _next.height = await rpc.height();
return moveState(_next); return moveState(_next);
} }