Reimplement "peer finder" logic using funcitonal ReactiveX style.

The benefits of this are as follows:

No longer need to worry about how many types of `Peer`s exist.
There is a single publicly accessible `PeerFinder` which aggregates
the results of both the Bluetooth and Bonjour peer finders. In the
future if another is added, the consumer of the peer finder
(i.e. `StartSwapView`) doesn't need to be aware of this. Neither does
the `SwapService` or `SwapActivity` or any other code.

Never ask "Are we searching" but instead receive push notifications
from the peer finder when it stops searching.

Don't worry about receiving the same peer multiple times, it will
automatically get filtered out.

Less concern about doing things in `AsyncTasks` (and knowing what to
do in an `AsyncTask`). The RXJava + RXAndroid libraries deal with this
by allowing the client consuming the `PeerFinder` to specify which
thread to perform the background task on, and also that the found
`Peer`s should be emitted on the UI thread.

In the future, can play with caching the results of a particular
sequence of found peers. However right now using the `Observable.cache()`
method means we can no longer unsubscribe from the peer finders
and thus they run longer than they need to when we move on from
the initial swap screen.
This commit is contained in:
Peter Serwylo 2016-01-26 16:17:29 +11:00
parent 73d24d987e
commit 10ccd5c503
6 changed files with 181 additions and 165 deletions

View File

@ -34,9 +34,8 @@ import org.fdroid.fdroid.Utils;
import org.fdroid.fdroid.data.NewRepoConfig;
import org.fdroid.fdroid.data.Repo;
import org.fdroid.fdroid.data.RepoProvider;
import org.fdroid.fdroid.localrepo.peers.BluetoothFinder;
import org.fdroid.fdroid.localrepo.peers.BonjourFinder;
import org.fdroid.fdroid.localrepo.peers.Peer;
import org.fdroid.fdroid.localrepo.peers.PeerFinder;
import org.fdroid.fdroid.localrepo.type.BluetoothSwap;
import org.fdroid.fdroid.localrepo.type.SwapType;
import org.fdroid.fdroid.localrepo.type.WifiSwap;
@ -55,6 +54,11 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
/**
* Central service which manages all of the different moving parts of swap which are required
* to enable p2p swapping of apps.
@ -105,15 +109,31 @@ public class SwapService extends Service {
// Search for peers to swap
// ==========================================================
public void scanForPeers() {
Utils.debugLog(TAG, "Scanning for nearby devices to swap with...");
bonjourFinder.scan();
bluetoothFinder.scan();
}
private Observable<Peer> peerFinder;
public void stopScanningForPeers() {
bonjourFinder.cancel();
bluetoothFinder.cancel();
/**
* Call {@link Observable#subscribe()} on this in order to be notified of peers
* which are found. Call {@link Subscription#unsubscribe()} on the resulting
* subscription when finished and you no longer want to scan for peers.
*
* The returned object will scan for peers on a background thread, and emit
* found peers on the mian thread.
*
* Invoking this in multiple places will return the same, cached, peer finder.
* That is, if in the past it already found some peers, then you subscribe
* to it in the future, the future subscriber will still receive the peers
* that were found previously.
* TODO: What about removing peers that no longer are present?
*/
public Observable<Peer> scanForPeers() {
Utils.debugLog(TAG, "Scanning for nearby devices to swap with...");
if (peerFinder == null) {
peerFinder = PeerFinder.createObservable(getApplicationContext())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.distinct();
}
return peerFinder;
}
// ==========================================================
@ -442,14 +462,6 @@ public class SwapService extends Service {
// Interacting with Bluetooth adapter
// ==========================================
public BonjourFinder getBonjourFinder() {
return bonjourFinder;
}
public BluetoothFinder getBluetoothFinder() {
return bluetoothFinder;
}
public boolean isBluetoothDiscoverable() {
return bluetoothSwap.isDiscoverable();
}
@ -458,10 +470,6 @@ public class SwapService extends Service {
return wifiSwap.isConnected() && wifiSwap.getBonjour().isConnected();
}
public boolean isScanningForPeers() {
return bonjourFinder.isScanning() || bluetoothFinder.isScanning();
}
public static final String ACTION_PEER_FOUND = "org.fdroid.fdroid.SwapManager.ACTION_PEER_FOUND";
public static final String EXTRA_PEER = "EXTRA_PEER";
@ -482,9 +490,6 @@ public class SwapService extends Service {
private SwapType bluetoothSwap;
private WifiSwap wifiSwap;
private BonjourFinder bonjourFinder;
private BluetoothFinder bluetoothFinder;
private static final int TIMEOUT = 900000; // 15 mins
/**
@ -517,8 +522,6 @@ public class SwapService extends Service {
appsToSwap.addAll(deserializePackages(preferences.getString(KEY_APPS_TO_SWAP, "")));
bluetoothSwap = BluetoothSwap.create(this);
wifiSwap = new WifiSwap(this);
bonjourFinder = new BonjourFinder(this);
bluetoothFinder = new BluetoothFinder(this);
Preferences.get().registerLocalRepoHttpsListeners(httpsEnabledListener);

View File

@ -11,24 +11,44 @@ import android.util.Log;
import org.fdroid.fdroid.Utils;
public class BluetoothFinder extends PeerFinder<BluetoothPeer> {
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
class BluetoothFinder extends PeerFinder {
public static Observable<Peer> createBluetoothObservable(final Context context) {
return Observable.create(new Observable.OnSubscribe<Peer>() {
@Override
public void call(Subscriber<? super Peer> subscriber) {
final BluetoothFinder finder = new BluetoothFinder(context, subscriber);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
finder.cancel();
}
}));
finder.scan();
}
});
}
private static final String TAG = "BluetoothFinder";
public static final int DISCOVERABLE_TIMEOUT = 3600;
private final BluetoothAdapter adapter;
public BluetoothFinder(Context context) {
super(context);
BluetoothFinder(Context context, Subscriber<? super Peer> subscriber) {
super(context, subscriber);
adapter = BluetoothAdapter.getDefaultAdapter();
}
private BroadcastReceiver deviceFoundReceiver;
private BroadcastReceiver scanCompleteReceiver;
@Override
public void scan() {
private void scan() {
if (adapter == null) {
Log.i(TAG, "Not scanning for bluetooth peers to swap with, couldn't find a bluetooth adapter on this device.");
@ -84,7 +104,6 @@ public class BluetoothFinder extends PeerFinder<BluetoothPeer> {
}
@Override
public void cancel() {
if (adapter != null) {
Utils.debugLog(TAG, "Stopping bluetooth discovery.");
@ -95,12 +114,11 @@ public class BluetoothFinder extends PeerFinder<BluetoothPeer> {
}
private void onDeviceFound(BluetoothDevice device) {
if (device != null && device.getName() != null &&
(device.getBluetoothClass().getDeviceClass() == BluetoothClass.Device.COMPUTER_HANDHELD_PC_PDA ||
device.getBluetoothClass().getDeviceClass() == BluetoothClass.Device.COMPUTER_PALM_SIZE_PC_PDA ||
device.getBluetoothClass().getDeviceClass() == BluetoothClass.Device.PHONE_SMART)) {
foundPeer(new BluetoothPeer(device));
subscriber.onNext(new BluetoothPeer(device));
}
}

View File

@ -2,8 +2,6 @@ package org.fdroid.fdroid.localrepo.peers;
import android.content.Context;
import android.net.wifi.WifiManager;
import android.os.AsyncTask;
import android.util.Log;
import org.fdroid.fdroid.FDroidApp;
import org.fdroid.fdroid.Utils;
@ -16,7 +14,30 @@ import javax.jmdns.ServiceEvent;
import javax.jmdns.ServiceInfo;
import javax.jmdns.ServiceListener;
public class BonjourFinder extends PeerFinder<BonjourPeer> implements ServiceListener {
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
class BonjourFinder extends PeerFinder implements ServiceListener {
public static Observable<Peer> createBonjourObservable(final Context context) {
return Observable.create(new Observable.OnSubscribe<Peer>() {
@Override
public void call(Subscriber<? super Peer> subscriber) {
final BonjourFinder finder = new BonjourFinder(context, subscriber);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
finder.cancel();
}
}));
finder.scan();
}
});
}
private static final String TAG = "BonjourFinder";
@ -27,12 +48,11 @@ public class BonjourFinder extends PeerFinder<BonjourPeer> implements ServiceLis
private WifiManager wifiManager;
private WifiManager.MulticastLock mMulticastLock;
public BonjourFinder(Context context) {
super(context);
BonjourFinder(Context context, Subscriber<? super Peer> subscriber) {
super(context, subscriber);
}
@Override
public void scan() {
private void scan() {
Utils.debugLog(TAG, "Requested Bonjour (mDNS) scan for peers.");
@ -44,55 +64,30 @@ public class BonjourFinder extends PeerFinder<BonjourPeer> implements ServiceLis
if (isScanning) {
Utils.debugLog(TAG, "Requested Bonjour scan, but already scanning. But we will still try to explicitly scan for services.");
// listServices();
return;
}
isScanning = true;
mMulticastLock.acquire();
new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
try {
Utils.debugLog(TAG, "Searching for Bonjour (mDNS) clients...");
jmdns = JmDNS.create(InetAddress.getByName(FDroidApp.ipAddressString));
} catch (IOException e) {
Log.e(TAG, "", e);
}
return null;
}
@Override
protected void onPostExecute(Void result) {
// TODO: This is not threadsafe - cancelling the discovery will make jmdns null, but it could happen after this check and before call to addServiceListener().
if (jmdns != null) {
Utils.debugLog(TAG, "Adding mDNS service listeners for " + HTTP_SERVICE_TYPE + " and " + HTTPS_SERVICE_TYPE);
jmdns.addServiceListener(HTTP_SERVICE_TYPE, BonjourFinder.this);
jmdns.addServiceListener(HTTPS_SERVICE_TYPE, BonjourFinder.this);
listServices();
}
}
}.execute();
try {
Utils.debugLog(TAG, "Searching for Bonjour (mDNS) clients...");
jmdns = JmDNS.create(InetAddress.getByName(FDroidApp.ipAddressString));
} catch (IOException e) {
subscriber.onError(e);
return;
}
Utils.debugLog(TAG, "Adding mDNS service listeners for " + HTTP_SERVICE_TYPE + " and " + HTTPS_SERVICE_TYPE);
jmdns.addServiceListener(HTTP_SERVICE_TYPE, BonjourFinder.this);
jmdns.addServiceListener(HTTPS_SERVICE_TYPE, BonjourFinder.this);
listServices();
}
private void listServices() {
// The member variable is likely to get set to null if a swap process starts, thus we hold
// a reference for the benefit of the background task so it doesn't have to synchronoize on it.
final JmDNS mdns = jmdns;
new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
Utils.debugLog(TAG, "Explicitly querying for services, in addition to waiting for notifications.");
addFDroidServices(mdns.list(HTTP_SERVICE_TYPE));
addFDroidServices(mdns.list(HTTPS_SERVICE_TYPE));
return null;
}
}.execute();
Utils.debugLog(TAG, "Explicitly querying for services, in addition to waiting for notifications.");
addFDroidServices(jmdns.list(HTTP_SERVICE_TYPE));
addFDroidServices(jmdns.list(HTTPS_SERVICE_TYPE));
}
@Override
@ -111,16 +106,8 @@ public class BonjourFinder extends PeerFinder<BonjourPeer> implements ServiceLis
// If so, when is the old one removed?
addFDroidService(event.getInfo());
// The member variable is likely to get set to null if a swap process starts, thus we hold
// a reference for the benefit of the background task so it doesn't have to synchronoize on it.
final JmDNS mdns = jmdns;
new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
mdns.requestServiceInfo(event.getType(), event.getName(), true);
return null;
}
}.execute();
Utils.debugLog(TAG, "Found JmDNS service, now requesting further details of service");
jmdns.requestServiceInfo(event.getType(), event.getName(), true);
}
@Override
@ -146,7 +133,7 @@ public class BonjourFinder extends PeerFinder<BonjourPeer> implements ServiceLis
final boolean isSelf = FDroidApp.repo != null && fingerprint != null && fingerprint.equalsIgnoreCase(FDroidApp.repo.fingerprint);
if (isFDroid && !isSelf) {
Utils.debugLog(TAG, "Found F-Droid swap Bonjour service:\n" + serviceInfo);
foundPeer(new BonjourPeer(serviceInfo));
subscriber.onNext(new BonjourPeer(serviceInfo));
} else {
if (isSelf) {
Utils.debugLog(TAG, "Ignoring Bonjour service because it belongs to this device:\n" + serviceInfo);
@ -156,8 +143,9 @@ public class BonjourFinder extends PeerFinder<BonjourPeer> implements ServiceLis
}
}
@Override
public void cancel() {
private void cancel() {
Utils.debugLog(TAG, "Cancelling BonjourFinder, releasing multicast lock, removing jmdns service listeners");
if (mMulticastLock != null) {
mMulticastLock.release();
}

View File

@ -1,45 +1,31 @@
package org.fdroid.fdroid.localrepo.peers;
import android.content.Context;
import android.content.Intent;
import android.support.v4.content.LocalBroadcastManager;
import android.util.Log;
import org.fdroid.fdroid.localrepo.SwapService;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* Searches for other devices in the vicinity, using specific technologies.
* Once found, sends an {@link SwapService#ACTION_PEER_FOUND} intent with the {@link SwapService#EXTRA_PEER}
* extra attribute set to the subclass of {@link Peer} that was found.
* Once found, emits a {@link Peer} to interested {@link Subscriber}s.
*/
public abstract class PeerFinder<T extends Peer> {
private static final String TAG = "PeerFinder";
public abstract class PeerFinder {
protected boolean isScanning;
protected final Context context;
protected final Subscriber<? super Peer> subscriber;
public abstract void scan();
public abstract void cancel();
public PeerFinder(Context context) {
protected PeerFinder(Context context, Subscriber<? super Peer> subscriber) {
this.context = context;
this.subscriber = subscriber;
}
public boolean isScanning() {
return isScanning;
}
protected void foundPeer(T peer) {
Log.i(TAG, "Found peer " + peer.getName());
Intent intent = new Intent(SwapService.ACTION_PEER_FOUND);
intent.putExtra(SwapService.EXTRA_PEER, peer);
LocalBroadcastManager.getInstance(context).sendBroadcast(intent);
}
protected void removePeer(T peer) {
// TODO: Broadcast messages when peers are removed too.
public static Observable<Peer> createObservable(final Context context) {
return Observable.merge(
BluetoothFinder.createBluetoothObservable(context).subscribeOn(Schedulers.newThread()),
BonjourFinder.createBonjourObservable(context).subscribeOn(Schedulers.newThread())
);
}
}

View File

@ -39,6 +39,8 @@ import org.fdroid.fdroid.net.WifiStateChangeService;
import java.util.ArrayList;
import cc.mvdan.accesspoint.WifiApControl;
import rx.Subscriber;
import rx.Subscription;
public class StartSwapView extends ScrollView implements SwapWorkflowActivity.InnerView {
@ -107,17 +109,56 @@ public class StartSwapView extends ScrollView implements SwapWorkflowActivity.In
private ListView peopleNearbyList;
private ProgressBar peopleNearbyProgress;
private PeopleNearbyAdapter peopleNearbyAdapter;
/**
* When peers are emitted by the peer finder, add them to the adapter
* so that they will show up in the list of peers.
*/
private Subscriber<Peer> onPeerFound = new Subscriber<Peer>() {
@Override
public void onCompleted() {
uiShowNotSearchingForPeers();
}
@Override
public void onError(Throwable e) {
uiShowNotSearchingForPeers();
}
@Override
public void onNext(Peer peer) {
Utils.debugLog(TAG, "Found peer: " + peer + ", adding to list of peers in UI.");
peopleNearbyAdapter.add(peer);
}
};
private Subscription peerFinderSubscription;
// TODO: Not sure if this is the best place to handle being removed from the view.
@Override
protected void onDetachedFromWindow() {
super.onDetachedFromWindow();
if (peerFinderSubscription != null) {
peerFinderSubscription.unsubscribe();
peerFinderSubscription = null;
}
}
@Override
protected void onFinishInflate() {
super.onFinishInflate();
getManager().scanForPeers();
if (peerFinderSubscription == null) {
peerFinderSubscription = getManager().scanForPeers().subscribe(onPeerFound);
}
uiInitPeers();
uiInitBluetooth();
uiInitWifi();
uiInitButtons();
uiUpdatePeersInfo();
uiShowSearchingForPeers();
// TODO: Unregister this receiver at some point.
LocalBroadcastManager.getInstance(getActivity()).registerReceiver(
@ -157,48 +198,30 @@ public class StartSwapView extends ScrollView implements SwapWorkflowActivity.In
peopleNearbyList = (ListView) findViewById(R.id.list_people_nearby);
peopleNearbyProgress = (ProgressBar) findViewById(R.id.searching_people_nearby);
final PeopleNearbyAdapter adapter = new PeopleNearbyAdapter(getContext());
peopleNearbyList.setAdapter(adapter);
uiUpdatePeersInfo();
peopleNearbyAdapter = new PeopleNearbyAdapter(getContext());
peopleNearbyList.setAdapter(peopleNearbyAdapter);
peopleNearbyList.setOnItemClickListener(new AdapterView.OnItemClickListener() {
@Override
public void onItemClick(AdapterView<?> parent, View view, int position, long id) {
Peer peer = adapter.getItem(position);
Peer peer = peopleNearbyAdapter.getItem(position);
onPeerSelected(peer);
}
});
// TODO: Unregister this receiver at the right time.
LocalBroadcastManager.getInstance(getContext()).registerReceiver(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
Peer peer = intent.getParcelableExtra(SwapService.EXTRA_PEER);
if (adapter.getPosition(peer) >= 0) {
Utils.debugLog(TAG, "Found peer: " + peer + ", ignoring though, because it is already in our list.");
} else {
Utils.debugLog(TAG, "Found peer: " + peer + ", adding to list of peers in UI.");
adapter.add(peer);
uiUpdatePeersInfo();
}
}
}, new IntentFilter(SwapService.ACTION_PEER_FOUND));
}
private void uiUpdatePeersInfo() {
if (getManager().isScanningForPeers()) {
peopleNearbyText.setText(getContext().getString(R.string.swap_scanning_for_peers));
peopleNearbyProgress.setVisibility(View.VISIBLE);
} else {
peopleNearbyProgress.setVisibility(View.GONE);
if (peopleNearbyList.getAdapter().getCount() > 0) {
peopleNearbyText.setText(getContext().getString(R.string.swap_people_nearby));
} else {
peopleNearbyText.setText(getContext().getString(R.string.swap_no_peers_nearby));
}
}
private void uiShowSearchingForPeers() {
peopleNearbyText.setText(getContext().getString(R.string.swap_scanning_for_peers));
peopleNearbyProgress.setVisibility(View.VISIBLE);
}
private void uiShowNotSearchingForPeers() {
peopleNearbyProgress.setVisibility(View.GONE);
if (peopleNearbyList.getAdapter().getCount() > 0) {
peopleNearbyText.setText(getContext().getString(R.string.swap_people_nearby));
} else {
peopleNearbyText.setText(getContext().getString(R.string.swap_no_peers_nearby));
}
}
private void uiInitBluetooth() {
@ -224,7 +247,6 @@ public class StartSwapView extends ScrollView implements SwapWorkflowActivity.In
getActivity().startBluetoothSwap();
textBluetoothVisible.setText(R.string.swap_visible_bluetooth);
viewBluetoothId.setVisibility(View.VISIBLE);
uiUpdatePeersInfo();
Utils.debugLog(TAG, "Received onCheckChanged(true) for Bluetooth swap (prompting user or setup Bluetooth complete)");
// TODO: When they deny the request for enabling bluetooth, we need to disable this switch...
} else {
@ -232,7 +254,6 @@ public class StartSwapView extends ScrollView implements SwapWorkflowActivity.In
getManager().getBluetoothSwap().stop();
textBluetoothVisible.setText(R.string.swap_not_visible_bluetooth);
viewBluetoothId.setVisibility(View.GONE);
uiUpdatePeersInfo();
Utils.debugLog(TAG, "Received onCheckChanged(false) for Bluetooth swap, Bluetooth swap disabled successfully.");
}
}
@ -285,7 +306,6 @@ public class StartSwapView extends ScrollView implements SwapWorkflowActivity.In
getManager().getWifiSwap().stop();
Utils.debugLog(TAG, "Received onCheckChanged(false) for WiFi swap, WiFi swap disabled successfully.");
}
uiUpdatePeersInfo();
uiUpdateWifiNetwork();
}
});

View File

@ -44,7 +44,6 @@ import org.fdroid.fdroid.data.NewRepoConfig;
import org.fdroid.fdroid.installer.Installer;
import org.fdroid.fdroid.localrepo.LocalRepoManager;
import org.fdroid.fdroid.localrepo.SwapService;
import org.fdroid.fdroid.localrepo.peers.BluetoothFinder;
import org.fdroid.fdroid.localrepo.peers.Peer;
import org.fdroid.fdroid.net.ApkDownloader;
@ -375,7 +374,7 @@ public class SwapWorkflowActivity extends AppCompatActivity {
if (!getService().isEnabled()) {
prepareInitialRepo();
}
getService().scanForPeers();
inflateInnerView(R.layout.swap_blank);
}
@ -511,7 +510,6 @@ public class SwapWorkflowActivity extends AppCompatActivity {
}
public void swapWith(Peer peer) {
getService().stopScanningForPeers();
getService().swapWith(peer);
showSelectApps();
}
@ -623,9 +621,12 @@ public class SwapWorkflowActivity extends AppCompatActivity {
// TODO: Listen for BluetoothAdapter.ACTION_SCAN_MODE_CHANGED and respond if discovery
// is cancelled prematurely.
// 3600 is new maximum! TODO: What about when this expires? What if user manually disables discovery?
final int discoverableTimeout = 3600;
Utils.debugLog(TAG, "Not currently in discoverable mode, so prompting user to enable.");
Intent intent = new Intent(BluetoothAdapter.ACTION_REQUEST_DISCOVERABLE);
intent.putExtra(BluetoothAdapter.EXTRA_DISCOVERABLE_DURATION, BluetoothFinder.DISCOVERABLE_TIMEOUT); // 3600 is new maximum! TODO: What about when this expires? What if user manually disables discovery?
intent.putExtra(BluetoothAdapter.EXTRA_DISCOVERABLE_DURATION, discoverableTimeout);
startActivityForResult(intent, REQUEST_BLUETOOTH_DISCOVERABLE);
}
@ -761,7 +762,7 @@ public class SwapWorkflowActivity extends AppCompatActivity {
bluetooth = "\"" + adapter.getName() + "\" - " + scanModes.get(adapter.getScanMode());
}
wifi = service.getBonjourFinder().isScanning() ? "Y" : " N";
// wifi = service.getBonjourFinder().isScanning() ? "Y" : " N";
message += "Find { BT: " + bluetooth + ", WiFi: " + wifi + "}";
}