From 35710f975774998dc1574863414458fd94d3b05c Mon Sep 17 00:00:00 2001 From: fuwa Date: Wed, 24 Jul 2019 21:10:34 +0000 Subject: [PATCH] add daemon input sink --- cyberwow/lib/controller/syncing.dart | 5 ++++- cyberwow/lib/main.dart | 5 +++-- cyberwow/lib/state.dart | 16 ++++++++++------ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/cyberwow/lib/controller/syncing.dart b/cyberwow/lib/controller/syncing.dart index 5515d23..c3b1513 100644 --- a/cyberwow/lib/controller/syncing.dart +++ b/cyberwow/lib/controller/syncing.dart @@ -30,7 +30,7 @@ import 'helper.dart'; import '../config.dart' as config; import '../logging.dart'; -Stream runBinary (String name) async* { +Stream runBinary (String name, {Stream input}) async* { final newPath = await getBinaryPath(name); final appDocDir = await getApplicationDocumentsDirectory(); @@ -60,6 +60,9 @@ Stream runBinary (String name) async* { log.info('args: ' + args.toString()); final outputProcess = await Process.start(newPath, args); + if (input != null) { + outputProcess.stdin.addStream(input.transform(utf8.encoder)); + } await for (final line in outputProcess.stdout.transform(utf8.decoder)) { yield line; } diff --git a/cyberwow/lib/main.dart b/cyberwow/lib/main.dart index 1150fc0..57cb311 100644 --- a/cyberwow/lib/main.dart +++ b/cyberwow/lib/main.dart @@ -123,9 +123,10 @@ class _CyberWOW_PageState extends State with WidgetsBindingObserv SyncingState _syncingState = await _loadingState.next(loading, ''); - final syncing = runBinary(binName).asBroadcastStream(); + final StreamController inputStreamController = StreamController(); + final syncing = runBinary(binName, input: inputStreamController.stream).asBroadcastStream(); - SyncedState _syncedState = await _syncingState.next(syncing); + SyncedState _syncedState = await _syncingState.next(inputStreamController.sink, syncing); await _syncedState.next(); var validState = true; diff --git a/cyberwow/lib/state.dart b/cyberwow/lib/state.dart index 37db11e..359aa59 100644 --- a/cyberwow/lib/state.dart +++ b/cyberwow/lib/state.dart @@ -156,7 +156,7 @@ class SyncingState extends HookedState { syncState(); } - Future next(Stream processOutput) async { + Future next(StreamSink processInput, Stream processOutput) async { log.fine("Syncing next"); Future printStdout() async { @@ -191,7 +191,7 @@ class SyncingState extends HookedState { log.fine('syncing: loop exit'); final _height = await rpc.height(); - SyncedState _next = SyncedState(setState, getNotification, stdout, processOutput); + SyncedState _next = SyncedState(setState, getNotification, stdout, processInput, processOutput); _next.height = _height; return moveState(_next); } @@ -200,6 +200,7 @@ class SyncingState extends HookedState { class SyncedState extends HookedState { Queue stdout; int height; + StreamSink processInput; Stream processOutput; bool synced = true; bool connected = true; @@ -208,7 +209,7 @@ class SyncedState extends HookedState { List getConnections = []; List getTransactionPool = []; - SyncedState(f, s, this.stdout, this.processOutput) : super (f, s); + SyncedState(f, s, this.stdout, this.processInput, this.processOutput) : super (f, s); Future next() async { log.fine("Synced next"); @@ -240,6 +241,8 @@ class SyncedState extends HookedState { getTransactionPool = await rpc.getTransactionPoolSimple(); + // processInput.add('help\n'); + // log.fine('getTransactionPool: $getTransactionPool'); syncState(); } @@ -249,7 +252,7 @@ class SyncedState extends HookedState { log.fine('synced: loop exit'); - ReSyncingState _next = ReSyncingState(setState, getNotification, stdout, processOutput); + ReSyncingState _next = ReSyncingState(setState, getNotification, stdout, processInput, processOutput); return moveState(_next); } } @@ -257,10 +260,11 @@ class SyncedState extends HookedState { class ReSyncingState extends HookedState { Queue stdout; + StreamSink processInput; Stream processOutput; bool synced = false; - ReSyncingState(f, s, this.stdout, this.processOutput) : super (f, s); + ReSyncingState(f, s, this.stdout, this.processInput, this.processOutput) : super (f, s); void append(String msg) { stdout.addLast(msg); @@ -297,7 +301,7 @@ class ReSyncingState extends HookedState { await checkSync(); log.fine('resync: await exit'); - SyncedState _next = SyncedState(setState, getNotification, stdout, processOutput); + SyncedState _next = SyncedState(setState, getNotification, stdout, processInput, processOutput); _next.height = await rpc.height(); return moveState(_next); }