RxJava开源库
RxLifecycle
RxJava 通过线程调度器更容易控制和切换线程,种种优点,使用它的人也越来越多。但是使用不好,很容易导致内存泄露。用来严格控制由于发布了一个订阅后,由于没有及时取消,导致 Activity/Fragment 无法销毁导致的内存泄露。
引入
// RxLifecycle基础库
compile 'com.trello.rxlifecycle2:rxlifecycle:2.1.0'
// Android使用的库,里面使用了Android的生命周期方法
// 内部引用了基础库,如果使用此库则无需再引用基础库
compile 'com.trello.rxlifecycle2:rxlifecycle-android:2.1.0'
// Android组件库,里面定义了例如RxAppCompatActivity、RxFragment之类的Android组件
// 内部引用了基础库和Android库,如果使用此库则无需再重复引用
compile 'com.trello.rxlifecycle2:rxlifecycle-components:2.1.0'
// Android使用的库,继承NaviActivity使用
compile 'com.trello.rxlifecycle2:rxlifecycle-navi:2.1.0'
// Android使用的库,继承LifecycleActivity使用
// 需要引入Google的仓库支持,用法和rxlifecycle-navi类似
compile 'com.trello.rxlifecycle2:rxlifecycle-android-lifecycle:2.1.0'
// Google的仓库支持
allprojects {
repositories {
jcenter()
maven { url 'https://dl.google.com/dl/android/maven2/' }
}
}
// 支持Kotlin语法的RxLifecycle基础库
compile 'com.trello.rxlifecycle2:rxlifecycle-kotlin:2.1.0'
// 支持Kotlin语法的Android库
compile 'com.trello.rxlifecycle2:rxlifecycle-android-lifecycle-kotlin:2.1.0'
基本 API
1、bindToLifecycle()
在子类使用 Observable 中的 compose 操作符,调用,完成 Observable 发布的事件和当前的组件绑定,实现生命周期同步。从而实现当前组件生命周期结束时,自动取消对 Observable 订阅。
使用 compose(this.bindToLifecycle()) 方法绑定 Activity 的生命周期,在 onStart 方法中绑定,在 onStop 方法被调用后就会解除绑定,以此类推。
protected void onStart() {
super.onStart();
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(this.<Long>bindToLifecycle())
.subscribe();
}
2、bindUntilEvent()
使用 ActivityEvent 类,其中的 CREATE、START、 RESUME、PAUSE、STOP、 DESTROY 分别对应生命周期内的方法。使用 bindUntilEvent 指定在哪个生命周期方法调用时取消订阅。
Observable.interval(1, TimeUnit.SECONDS)
.compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(mSub);
使用
首先你的 Activity/Fragment 需继承 RxAppCompatActivity/RxFragment,目前支持的有 RxAppCompatActivity、RxFragment、RxDialogFragment、RxFragmentActivity。
然后,在使用时用你的 Observable 调用一下 compose()
案例
1、手动设置在 Activity 的 onPause 取消订阅 bindUntilEvent(@NonNull ActivityEvent event)
// Specifically bind this until onPause()
//Note:例子1:
Observable
.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription from onCreate()");
}
})
// Note:手动设置在activity onPause的时候取消订阅
.compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
}
});
2、自动取消订阅 bindToLifecycle()
//Note:例子2:
// Using automatic unsubscription, this should determine that the correct time to
// unsubscribe is onStop (the opposite of onStart).
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
//Note:bindToLifecycle的自动取消订阅示例,因为是在onStart的时候调用,所以在onStop的时候自动取消订阅
.compose(this.<Long>bindToLifecycle())
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
}
});
3、在 Activity 的 onDestroy() 中取消
//Note:例子3:
// `this.<Long>` is necessary if you're compiling on JDK7 or below.
// If you're using JDK8+, then you can safely remove it.
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription from onResume()");
}
})
//Note:手动设置在activity onDestroy的时候取消订阅
.compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
}
});
RxLifecycle Providers
LifecycleProvider 可以传递给 MVP 中的 P 使用。
RxAppCompatActivity 直接实现了 LifecycleProvider,可以在里面直接调用方法。
RxLifecycle ActivityLifecycleCallbacks 实现
https://gist.github.com/dlew/33b650bd8ef3d360ff7d
class RxActivityLifecycleCallbacks implements Application.ActivityLifecycleCallbacks {
private static RxActivityLifecycleCallbacks instance;
private Map<Activity, BehaviorSubject<LifecycleEvent>> activityBehaviorSubjectMap;
public static final RxActivityLifecycleCallbacks getInstance(Context context) {
if (instance == null) {
instance = new RxActivityLifecycleCallbacks(context);
}
return instance;
}
private RxActivityLifecycleCallbacks(Context context) {
activityBehaviorSubjectMap = new ConcurrentHashMap<Activity, BehaviorSubject<LifecycleEvent>>();
Application application = (Application) context.getApplicationContext();
application.registerActivityLifecycleCallbacks(this);
}
public Observable<LifecycleEvent> getLifecycle(Activity activity) {
BehaviorSubject<LifecycleEvent> subject = activityBehaviorSubjectMap.get(activity);
if (subject == null) {
throw new IllegalStateException("The Activity is outside the lifecycle; cannot bind to it!");
}
return subject.asObservable();
}
@Override
public void onActivityCreated(Activity activity, Bundle savedInstanceState) {
activityBehaviorSubjectMap.put(activity, BehaviorSubject.create(LifecycleEvent.CREATE));
}
@Override
public void onActivityStarted(Activity activity) {
activityBehaviorSubjectMap.get(activity).onNext(LifecycleEvent.START);
}
@Override
public void onActivityResumed(Activity activity) {
activityBehaviorSubjectMap.get(activity).onNext(LifecycleEvent.RESUME);
}
@Override
public void onActivityPaused(Activity activity) {
activityBehaviorSubjectMap.get(activity).onNext(LifecycleEvent.PAUSE);
}
@Override
public void onActivityStopped(Activity activity) {
activityBehaviorSubjectMap.get(activity).onNext(LifecycleEvent.STOP);
}
@Override
public void onActivityDestroyed(Activity activity) {
activityBehaviorSubjectMap.remove(activity).onNext(LifecycleEvent.DESTROY);
}
@Override
public void onActivitySaveInstanceState(Activity activity, Bundle outState) {
// Not tracked
}
}
Reference
RxLifeCycle 原理
http://brucezz.itscoder.com/articles/2016/09/19/usage_and_principle_of_rxlifecycle/
AutoDispose2
引入
// Java
implementation 'com.uber.autodispose2:autodispose:x.y.z'
// LifecycleScopeProvider
implementation 'com.uber.autodispose2:autodispose-lifecycle:x.y.z'
// Android extensions:
implementation 'com.uber.autodispose2:autodispose-android:x.y.z'
// Android Architecture Components extensions : 引入这个会把前面的都引入进来
// AutoDispose 1.x
implementation 'com.uber.autodispose:autodispose-android-archcomponents:x.y.z'
// AutoDispose 2.x
implementation 'com.uber.autodispose2:autodispose-androidx-lifecycle:x.y.z'
// Androidx-Lifecycle Test extensions:
// AutoDispose 1.x
implementation 'com.uber.autodispose:autodispose-android-archcomponents-test:x.y.z'
// AutoDispose 2.x
implementation 'com.uber.autodispose2:autodispose-androidx-lifecycle-test:x.y.z'
RxLifecycle interop (AutoDispose 1.x/RxJava 2.x only)
// autodispose-rxlifecycle
implementation 'com.uber.autodispose:autodispose-rxlifecycle:x.y.z'
// autodispose-rxlifecycle3
implementation 'com.uber.autodispose:autodispose-rxlifecycle3:x.y.z'
使用
Java 版本
- 根据 subscribe 的时 lifecycle owner 状态来决定 dispose 的时机
Observable.interval(1, TimeUnit.SECONDS)
.to(autoDisposable(AndroidLifecycleScopeProvider.from(this)))
.subscribe();
onCreate→onDestroy onStart→onStop onResume→onPause
- 指定 Lifecycle.Event dispose
Observable.interval(1, TimeUnit.SECONDS)
.to(autoDisposable(AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_DESTROY)))
.subscribe();
Kotlin 版本
- 根据 subscribe 的时 lifecycle owner 状态来决定 dispose 的时机
private val scopeProvider by lazy { AndroidLifecycleScopeProvider.from(this) }
// Activity
// Using automatic disposal, this should determine that the correct time to
// dispose is onDestroy (the opposite of onCreate).
Observable.interval(1, TimeUnit.SECONDS)
.autoDispose(scopeProvider)
.subscribeBy { }
// Fragment
Observable.interval(1, TimeUnit.SECONDS)
.autoDispose(AndroidLifecycleScopeProvider.from(viewLifecycleOwner))
.subscribeBy { }
onCreate→onDestroy onStart→onStop onResume→onPause
- 指定 Lifecycle.Event dispose
// Setting a specific untilEvent, this should dispose in onDestroy.
Observable.interval(1, TimeUnit.SECONDS)
.autoDispose(
AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_DESTROY))
.subscribeBy { }
ViewScopeProvider
在 View#onDetachWindow 时,调用 Observer#onComplete,结束当前事件流
Observable
.create(ObservableOnSubscribe<Int> {
for (i in 0..9) {
if (!it.isDisposed) {
it.onNext(i)
}
try {
Thread.sleep(1000)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
it.onComplete()
})
.subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.autoDispose(ViewScopeProvider.from(this))
.subscribe(
{
"onNext $it".logi()
onNext.invoke("onNext $it".log())
},
{
"onError ${it.message}".loge()
},
{
"onComplete".logd()
}
)
自定义 ScopeProvider,参考 ViewScopeProvider
自定义 LifeScopeProvider
AutoDisposeActivity
public abstract class AutoDisposeActivity extends Activity
implements LifecycleScopeProvider<AutoDisposeActivity.ActivityEvent> {
public enum ActivityEvent {
CREATE,
START,
RESUME,
PAUSE,
STOP,
DESTROY
}
/**
* This is a function of current event -> target disposal event. That is to say that if event A
* returns B, then any stream subscribed to during A will autodispose on B. In Android, we make
* symmetric boundary conditions. Create -> Destroy, Start -> Stop, etc. For anything after Resume
* we dispose on the next immediate destruction event. Subscribing after Destroy is an error.
*/
private static final CorrespondingEventsFunction<ActivityEvent> CORRESPONDING_EVENTS =
activityEvent -> {
switch (activityEvent) {
case CREATE:
return ActivityEvent.DESTROY;
case START:
return ActivityEvent.STOP;
case RESUME:
return ActivityEvent.PAUSE;
case PAUSE:
return ActivityEvent.STOP;
case STOP:
return ActivityEvent.DESTROY;
default:
throw new LifecycleEndedException("Cannot bind to Activity lifecycle after destroy.");
}
};
private final BehaviorSubject<ActivityEvent> lifecycleEvents = BehaviorSubject.create();
@Override
public Observable<ActivityEvent> lifecycle() {
return lifecycleEvents.hide();
}
@Override
public CorrespondingEventsFunction<ActivityEvent> correspondingEvents() {
return CORRESPONDING_EVENTS;
}
@Nullable
@Override
public ActivityEvent peekLifecycle() {
return lifecycleEvents.getValue();
}
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleEvents.onNext(ActivityEvent.CREATE);
}
@Override
protected void onStart() {
super.onStart();
lifecycleEvents.onNext(ActivityEvent.START);
}
@Override
protected void onResume() {
super.onResume();
lifecycleEvents.onNext(ActivityEvent.RESUME);
}
@Override
protected void onPause() {
lifecycleEvents.onNext(ActivityEvent.PAUSE);
super.onPause();
}
@Override
protected void onStop() {
lifecycleEvents.onNext(ActivityEvent.STOP);
super.onStop();
}
@Override
protected void onDestroy() {
lifecycleEvents.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
AutoDisposeFragment
/**
* A {@link Fragment} example implementation for making one implement {@link
* LifecycleScopeProvider}. One would normally use this as a base fragment class to extend others
* from.
*/
public abstract class AutoDisposeFragment extends Fragment
implements LifecycleScopeProvider<AutoDisposeFragment.FragmentEvent> {
public enum FragmentEvent {
ATTACH,
CREATE,
CREATE_VIEW,
START,
RESUME,
PAUSE,
STOP,
DESTROY_VIEW,
DESTROY,
DETACH
}
/**
* This is a function of current event -> target disposal event. That is to say that if event A
* returns B, then any stream subscribed to during A will autodispose on B. In Android, we make
* symmetric boundary conditions. Create -> Destroy, Start -> Stop, etc. For anything after Resume
* we dispose on the next immediate destruction event. Subscribing after Detach is an error.
*/
private static final CorrespondingEventsFunction<FragmentEvent> CORRESPONDING_EVENTS =
event -> {
switch (event) {
case ATTACH:
return FragmentEvent.DETACH;
case CREATE:
return FragmentEvent.DESTROY;
case CREATE_VIEW:
return FragmentEvent.DESTROY_VIEW;
case START:
return FragmentEvent.STOP;
case RESUME:
return FragmentEvent.PAUSE;
case PAUSE:
return FragmentEvent.STOP;
case STOP:
return FragmentEvent.DESTROY_VIEW;
case DESTROY_VIEW:
return FragmentEvent.DESTROY;
case DESTROY:
return FragmentEvent.DETACH;
default:
throw new LifecycleEndedException("Cannot bind to Fragment lifecycle after detach.");
}
};
private final BehaviorSubject<FragmentEvent> lifecycleEvents = BehaviorSubject.create();
@Override
public Observable<FragmentEvent> lifecycle() {
return lifecycleEvents.hide();
}
@Override
public CorrespondingEventsFunction<FragmentEvent> correspondingEvents() {
return CORRESPONDING_EVENTS;
}
@Nullable
@Override
public FragmentEvent peekLifecycle() {
return lifecycleEvents.getValue();
}
@Override
public void onAttach(Context context) {
super.onAttach(context);
lifecycleEvents.onNext(FragmentEvent.ATTACH);
}
@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleEvents.onNext(FragmentEvent.CREATE);
}
@Override
public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
lifecycleEvents.onNext(FragmentEvent.CREATE_VIEW);
}
@Override
public void onStart() {
super.onStart();
lifecycleEvents.onNext(FragmentEvent.START);
}
@Override
public void onResume() {
super.onResume();
lifecycleEvents.onNext(FragmentEvent.RESUME);
}
@Override
public void onPause() {
lifecycleEvents.onNext(FragmentEvent.PAUSE);
super.onPause();
}
@Override
public void onStop() {
lifecycleEvents.onNext(FragmentEvent.STOP);
super.onStop();
}
@Override
public void onDestroyView() {
lifecycleEvents.onNext(FragmentEvent.DESTROY_VIEW);
super.onDestroyView();
}
@Override
public void onDestroy() {
lifecycleEvents.onNext(FragmentEvent.DESTROY);
super.onDestroy();
}
@Override
public void onDetach() {
lifecycleEvents.onNext(FragmentEvent.DETACH);
super.onDetach();
}
}
AutoDisposeView
/**
* An example implementation of an AutoDispose View with lifecycle handling and precondition checks
* using {@link LifecycleScopeProvider}. The precondition checks here are only different from what
* {@link ViewScopeProvider} provides in that it will check against subscription in the constructor.
*/
public abstract class AutoDisposeView extends View
implements LifecycleScopeProvider<AutoDisposeView.ViewEvent> {
/**
* This is a function of current event -> target disposal event. That is to say that if event
* "Attach" returns "Detach", then any stream subscribed to during Attach will autodispose on
* Detach.
*/
private static final CorrespondingEventsFunction<ViewEvent> CORRESPONDING_EVENTS =
viewEvent -> {
switch (viewEvent) {
case ATTACH:
return ViewEvent.DETACH;
default:
throw new LifecycleEndedException("Cannot bind to View lifecycle after detach.");
}
};
@Nullable private BehaviorSubject<ViewEvent> lifecycleEvents = null;
public AutoDisposeView(Context context) {
this(context, null);
}
public AutoDisposeView(Context context, @Nullable AttributeSet attrs) {
this(context, attrs, View.NO_ID);
}
public AutoDisposeView(Context context, @Nullable AttributeSet attrs, int defStyleAttr) {
super(context, attrs, defStyleAttr);
init();
}
@RequiresApi(api = Build.VERSION_CODES.LOLLIPOP)
public AutoDisposeView(
Context context, @Nullable AttributeSet attrs, int defStyleAttr, int defStyleRes) {
super(context, attrs, defStyleAttr, defStyleRes);
init();
}
private void init() {
if (!isInEditMode()) {
// This is important to gate so you don't break the IDE preview!
lifecycleEvents = BehaviorSubject.create();
}
}
public enum ViewEvent {
ATTACH,
DETACH
}
@Override
protected void onAttachedToWindow() {
super.onAttachedToWindow();
if (lifecycleEvents != null) {
lifecycleEvents.onNext(ViewEvent.ATTACH);
}
}
@Override
protected void onDetachedFromWindow() {
super.onDetachedFromWindow();
if (lifecycleEvents != null) {
lifecycleEvents.onNext(ViewEvent.DETACH);
}
}
@SuppressWarnings("NullAway") // only null in layoutlib
@Override
public Observable<ViewEvent> lifecycle() {
//noinspection ConstantConditions only in layoutlib
return lifecycleEvents.hide();
}
@Override
public CorrespondingEventsFunction<ViewEvent> correspondingEvents() {
return CORRESPONDING_EVENTS;
}
@SuppressWarnings("NullAway") // only null in layoutlib
@Nullable
@Override
public ViewEvent peekLifecycle() {
//noinspection ConstantConditions only in layoutlib
return lifecycleEvents.getValue();
}
}
AutoDisposeViewHolder
/**
* Example implementation of a {@link androidx.recyclerview.widget.RecyclerView.ViewHolder}
* implementation that implements {@link LifecycleScopeProvider}. This could be useful for cases
* where you have subscriptions that should be disposed upon unbinding or otherwise aren't
* overwritten in future binds.
*/
public abstract class AutoDisposeViewHolder extends BindAwareViewHolder
implements LifecycleScopeProvider<AutoDisposeViewHolder.ViewHolderEvent> {
public enum ViewHolderEvent {
BIND,
UNBIND
}
private static final CorrespondingEventsFunction<ViewHolderEvent> CORRESPONDING_EVENTS =
viewHolderEvent -> {
switch (viewHolderEvent) {
case BIND:
return ViewHolderEvent.UNBIND;
default:
throw new LifecycleEndedException("Cannot use ViewHolder lifecycle after unbind.");
}
};
private final BehaviorSubject<ViewHolderEvent> lifecycleEvents = BehaviorSubject.create();
public AutoDisposeViewHolder(View itemView) {
super(itemView);
}
@Override
public CorrespondingEventsFunction<ViewHolderEvent> correspondingEvents() {
return CORRESPONDING_EVENTS;
}
@Override
public Observable<ViewHolderEvent> lifecycle() {
return lifecycleEvents.hide();
}
@Nullable
@Override
public ViewHolderEvent peekLifecycle() {
return lifecycleEvents.getValue();
}
@Override
protected void onBind() {
lifecycleEvents.onNext(ViewHolderEvent.BIND);
}
@Override
protected void onUnbind() {
lifecycleEvents.onNext(ViewHolderEvent.UNBIND);
}
}
AutoDisposeViewModel
/**
* Demo base [ViewModel] that can automatically dispose itself in [onCleared].
*/
abstract class AutoDisposeViewModel : ViewModel(), LifecycleScopeProvider<ViewModelEvent> {
// Subject backing the auto disposing of subscriptions.
private val lifecycleEvents = BehaviorSubject.createDefault(CREATED)
/**
* The events that represent the lifecycle of a [ViewModel].
*
* The [ViewModel] lifecycle is very simple. It is created
* and then allows you to clean up any resources in the
* [ViewModel.onCleared] method before it is destroyed.
*/
enum class ViewModelEvent {
CREATED, CLEARED
}
/**
* The observable representing the lifecycle of the [ViewModel].
*
* @return [Observable] modelling the [ViewModel] lifecycle.
*/
override fun lifecycle(): Observable<ViewModelEvent> {
return lifecycleEvents.hide()
}
/**
* Returns a [CorrespondingEventsFunction] that maps the
* current event -> target disposal event.
*
* @return function mapping the current event to terminal event.
*/
override fun correspondingEvents(): CorrespondingEventsFunction<ViewModelEvent> {
return CORRESPONDING_EVENTS
}
override fun peekLifecycle(): ViewModelEvent? {
return lifecycleEvents.value
}
/**
* Emit the [ViewModelEvent.CLEARED] event to
* dispose off any subscriptions in the ViewModel.
*/
override fun onCleared() {
lifecycleEvents.onNext(ViewModelEvent.CLEARED)
super.onCleared()
}
companion object {
/**
* Function of current event -> target disposal event. ViewModel has a very simple lifecycle.
* It is created and then later on cleared. So we only have two events and all subscriptions
* will only be disposed at [ViewModelEvent.CLEARED].
*/
private val CORRESPONDING_EVENTS = CorrespondingEventsFunction<ViewModelEvent> { event ->
when (event) {
ViewModelEvent.CREATED -> ViewModelEvent.CLEARED
else -> throw LifecycleEndedException(
"Cannot bind to ViewModel lifecycle after onCleared.")
}
}
}
}
AutoDispose 源码解析
这是一个简单的使用 AutoDispose 的例子
Observable.interval(1, TimeUnit.SECONDS)
.to(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
.subscribe();
现在我们看看这个 .to(),之前的 rx 版本为 as
// Observable
public final <R> R to(@NonNull ObservableConverter<T, ? extends R> converter) {
return Objects.requireNonNull(converter, "converter is null").apply(this);
}
public interface ObservableConverter<@NonNull T, @NonNull R> {
R apply(@NonNull Observable<T> upstream);
}
这个 to 操作符,就是传递一个 ObservableConverter,然后调用 apply 方法,将 upstream 转换为另外一个 R 输出。
现在看 AutoDispose.autoDisposable,然后一个 AutoDisposeConverter
// AutoDispose
public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
checkNotNull(provider, "provider == null");
return autoDisposable(completableOf(provider));
}
autoDisposable() 返回 AutoDisposeConverter 实现了一堆 XXXConverter,在这里是 ObservableConverter
public interface AutoDisposeConverter<T>
extends FlowableConverter<T, FlowableSubscribeProxy<T>>,
ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>,
ObservableConverter<T, ObservableSubscribeProxy<T>>,
MaybeConverter<T, MaybeSubscribeProxy<T>>,
SingleConverter<T, SingleSubscribeProxy<T>>,
CompletableConverter<CompletableSubscribeProxy> {}
ScopeProvier
参数 provier 是一个 ScopeProvider
public interface ScopeProvider {
ScopeProvider UNBOUND = Completable::never;
CompletableSource requestScope() throws Exception;
}
实现有 ViewScopeProvider 和 LifecycleScopeProvider
- ViewScopeProvider 提供给 view 用的
- LifecycleScopeProvider 绑定 lifecycle
LifecycleScopeProvider
- Observable lifecycle(); 获取 Observable
- CorrespondingEventsFunction correspondingEvents(); 事件的对应关系,建议弄成静态变量更好
- E peekLifecycle(); 获取当前时刻最后可见的事件(the last seen lifecycle event)
- CompletableSource requestScope()
上游 upstream 流程
public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
checkNotNull(provider, "provider == null");
return autoDisposable(completableOf(provider));
}
我们看下 completableOf(ScopeProvider)
// Scopes
public static Completable completableOf(ScopeProvider scopeProvider) {
return Completable.defer(
() -> {
try {
return scopeProvider.requestScope();
} catch (OutsideScopeException e) {
Consumer<? super OutsideScopeException> handler =
AutoDisposePlugins.getOutsideScopeHandler();
if (handler != null) {
handler.accept(e);
return Completable.complete();
} else {
return Completable.error(e);
}
}
});
}
用了 defer 操作符,在真正 subscribe 的时候才 new Observable,而这个 CompletableSource 是通过 ScopeProvider 的 requestScope() 给返回
由 completableOf 得知 CompletableSource 是 ScopeProvider 提供的,接着我们看 autoDisposable(CompletableSource)
// AutoDispose
public static <T> AutoDisposeConverter<T> autoDisposable(final CompletableSource scope) {
checkNotNull(scope, "scope == null");
return new AutoDisposeConverter<T>() {
@Override
public ParallelFlowableSubscribeProxy<T> apply(final ParallelFlowable<T> upstream) {}
@Override
public CompletableSubscribeProxy apply(final Completable upstream) { }
@Override
public FlowableSubscribeProxy<T> apply(final Flowable<T> upstream) { }
@Override
public MaybeSubscribeProxy<T> apply(final Maybe<T> upstream) { }
@Override
public SingleSubscribeProxy<T> apply(final Single<T> upstream) { }
@Override
public ObservableSubscribeProxy<T> apply(final Observable<T> upstream) {
if (!AutoDisposePlugins.hideProxies) {
return new AutoDisposeObservable<>(upstream, scope);
}
return new ObservableSubscribeProxy<T>() {
@Override
public Disposable subscribe() {
return new AutoDisposeObservable<>(upstream, scope).subscribe();
}
@Override
public Disposable subscribe(Consumer<? super T> onNext) {
return new AutoDisposeObservable<>(upstream, scope).subscribe(onNext);
}
@Override
public Disposable subscribe(
Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return new AutoDisposeObservable<>(upstream, scope).subscribe(onNext, onError);
}
@Override
public Disposable subscribe(
Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
return new AutoDisposeObservable<>(upstream, scope)
.subscribe(onNext, onError, onComplete);
}
@Override
public void subscribe(Observer<? super T> observer) {
new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
}
@Override
public <E extends Observer<? super T>> E subscribeWith(E observer) {
return new AutoDisposeObservable<>(upstream, scope).subscribeWith(observer);
}
@Override
public TestObserver<T> test() {
TestObserver<T> observer = new TestObserver<>();
subscribe(observer);
return observer;
}
@Override
public TestObserver<T> test(boolean dispose) {
TestObserver<T> observer = new TestObserver<>();
if (dispose) {
observer.dispose();
}
subscribe(observer);
return observer;
}
};
}
};
}
AutoDisposeConverter 提供了 ParallelFlowable、Flowable,Maybe,Single 和 Observable 的 XXXConverter。
在我们现在这个案例是 ObservableConverter,默认 AutoDisposePlugins.hideProxies=false,所以走的 ObservableSubscribeProxy,只是一个代理,最终走的是 AutoDisposeObservable,将参数 upstream 和 scope 传递进去。
而这个 scope 从之前分析可以知道,是通过 completableOf 返回的,最终是通过 ScopeProvider#requestScope() 返回。
我们目前这个案例是通过 AndroidLifecycleScopeProvider.from(LifecycleOwner) 获取的
// AndroidLifecycleScopeProvider
public static AndroidLifecycleScopeProvider from(
Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
return new AndroidLifecycleScopeProvider(lifecycle, boundaryResolver);
}
现在我们看看 requestScope 的返回
// AndroidLifecycleScopeProvider
public CompletableSource requestScope() {
return LifecycleScopes.resolveScopeFromLifecycle(this);
}
往下看 LifecycleScopes#resolveScopeFromLifecycle(LifecycleScopeProvider)
// LifecycleScopes
public static <E> CompletableSource resolveScopeFromLifecycle(
final LifecycleScopeProvider<E> provider) throws OutsideScopeException {
return resolveScopeFromLifecycle(provider, true);
}
public static <E> CompletableSource resolveScopeFromLifecycle(
final LifecycleScopeProvider<E> provider, final boolean checkEndBoundary)
throws OutsideScopeException {
E lastEvent = provider.peekLifecycle(); // 获取最后可见的事件
CorrespondingEventsFunction<E> eventsFunction = provider.correspondingEvents(); // 事件对应关系
if (lastEvent == null) {
throw new LifecycleNotStartedException();
}
E endEvent;
try {
endEvent = eventsFunction.apply(lastEvent); // 获取最后可见的事件找到对应的endEvent,即在该Evnet时dispose
} catch (Exception e) {
if (checkEndBoundary && e instanceof LifecycleEndedException) {
Consumer<? super OutsideScopeException> handler =
AutoDisposePlugins.getOutsideScopeHandler();
if (handler != null) {
try {
handler.accept((LifecycleEndedException) e);
// Swallowed the end exception, just silently dispose immediately.
return Completable.complete();
} catch (Throwable e1) {
return Completable.error(e1);
}
}
throw e;
}
return Completable.error(e);
}
return resolveScopeFromLifecycle(provider.lifecycle(), endEvent); // 获取Observable,将endEvent传递过去
}
- 获取当前时刻最后的 lastEvent
- 通过 CorrespondingEventsFunction,找到 lastEvent 对应的 endEvent
// LifecycleScopes
public static <E> CompletableSource resolveScopeFromLifecycle(
Observable<E> lifecycle, final E endEvent) {
@Nullable Comparator<E> comparator = null;
if (endEvent instanceof Comparable) {
//noinspection unchecked
comparator = (Comparator<E>) COMPARABLE_COMPARATOR;
}
return resolveScopeFromLifecycle(lifecycle, endEvent, comparator);
}
public static <E> CompletableSource resolveScopeFromLifecycle(
Observable<E> lifecycle, final E endEvent, @Nullable final Comparator<E> comparator) {
Predicate<E> equalityPredicate;
if (comparator != null) {
equalityPredicate = e -> comparator.compare(e, endEvent) >= 0;
} else {
equalityPredicate = e -> e.equals(endEvent);
}
return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
}
直到结束事件才会执行 compete,忽略其他方法。equalityPredicate:当前 event 和 endEvent 相等是 complete。
现在看看 AndroidLifecycleScopeProvider#lifecycle()
// AndroidLifecycleScopeProvider
private final LifecycleEventsObservable lifecycleObservable;
public Observable<Lifecycle.Event> lifecycle() {
return lifecycleObservable;
}
lifecycleObservable 是一个 LifecycleEventsObservable
class LifecycleEventsObservable extends Observable<Event> {
private final Lifecycle lifecycle; // Lifecycle
private final BehaviorSubject<Event> eventsObservable = BehaviorSubject.create(); // 当前处于什么事件的BehaviorSubject
@Override
protected void subscribeActual(Observer<? super Event> observer) {
AutoDisposeLifecycleObserver lifecycleObserver =
new AutoDisposeLifecycleObserver(lifecycle, observer, eventsObservable);
observer.onSubscribe(lifecycleObserver);
if (!isMainThread()) {
observer.onError(
new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
return;
}
lifecycle.addObserver(lifecycleObserver);
if (lifecycleObserver.isDisposed()) {
lifecycle.removeObserver(lifecycleObserver);
}
}
static final class AutoDisposeLifecycleObserver extends MainThreadDisposable
implements LifecycleObserver {
private final Lifecycle lifecycle;
private final Observer<? super Event> observer;
private final BehaviorSubject<Event> eventsObservable;
AutoDisposeLifecycleObserver(
Lifecycle lifecycle,
Observer<? super Event> observer,
BehaviorSubject<Event> eventsObservable) {
this.lifecycle = lifecycle;
this.observer = observer;
this.eventsObservable = eventsObservable;
}
@Override
protected void onDispose() {
lifecycle.removeObserver(this);
}
@OnLifecycleEvent(Event.ON_ANY)
void onStateChange(@SuppressWarnings("unused") LifecycleOwner owner, Event event) {
if (!isDisposed()) {
if (!(event == ON_CREATE && eventsObservable.getValue() == event)) {
// Due to the INITIALIZED->ON_CREATE mapping trick we do in backfill(),
// we fire this conditionally to avoid duplicate CREATE events.
eventsObservable.onNext(event);
}
observer.onNext(event);
}
}
}
}
- 将当前 observer 包装成 AutoDisposeLifecycleObserver(lifecycleObserver),是一个 LifecycleObserver 可以监听 Activity/Fragment 的生命周期变化
- 当前 subscribe 时,将 lifecycleObserver 绑定到 lifecycle 中去
- 在 AutoDisposeLifecycleObserver#onStateChange,通过 eventsObservable 更新当前的 event
- Lifecycle state 变化时,调用 observer.onNext(event)
subscribe 流程
final class AutoDisposeObservable<T> extends Observable<T> implements ObservableSubscribeProxy<T> {
private final ObservableSource<T> source;
private final CompletableSource scope;
AutoDisposeObservable(ObservableSource<T> source, CompletableSource scope) {
this.source = source;
this.scope = scope;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new AutoDisposingObserverImpl<>(scope, observer));
}
}
前面知道这里的 source 是 LifecycleScopeProvider#lifecycle()提供的,也就是 LifecycleEventsObservable。
将 observer 包装成 AutoDisposingObserverImpl
RxPermissions
RxBinding
RxBinding ,Android 平台上的基于 RxJava 的 Binding API。把
发布→订阅模式用在了 Android 控件的点击,文本变化上。通过 RxBinding 把点击监听转换成了Observable,就可以对其进行扩展了。
https://github.com/JakeWharton/RxBinding
View 的点击事件
- RxView.clicks(@NonNull View view) 控件点击事件
- RxView.longClicks(@NonNull View view) 控件长按事件,并且返回 true
- RxView.longClicks(@NonNull View view, @NonNull Func0 handled) 控件长按事件
其他控件
- RxTextView.textChanges(@NonNull TextView view) 只关心 TextView 的
onTextChanged()变化的 String 而已,可以用这个 - RxTextView.textChangeEvents(@NonNull TextView view) TextView 的
onTextChanged()多个参数的封装 - RxTextView.beforeTextChangeEvents(@NonNull TextView view) 对应 TextView 的
beforeTextChanged() - RxTextView.afterTextChangeEvents(@NonNull TextView view) 对应 TextView 的
afterTextChanged() - RxCompoundButton.checkedChanges(@NonNull CompoundButton view) CheckBox 的 check 变化
RxBinding 之 InitialValueObservable
public abstract class InitialValueObservable<T> extends Observable<T> {
@Override protected final void subscribeActual(Observer<? super T> observer) {
subscribeListener(observer);
observer.onNext(getInitialValue());
}
protected abstract void subscribeListener(Observer<? super T> observer);
protected abstract T getInitialValue();
public final Observable<T> skipInitialValue() {
return new Skipped();
}
private final class Skipped extends Observable<T> {
Skipped() {
}
@Override protected void subscribeActual(Observer<? super T> observer) {
subscribeListener(observer);
}
}
}
在 subscribeActual(),调用 subscribeListener() 进行订阅 Listener,然后调用 onNext 发射一条初始值。
默认 InitialValueObservable 被订阅时,会发送 init 值。所以如果不需要处理初始值,需要 skip 初始值。如 RxBinding 对 EditText 的 textChanged() 监听,需要 skip 空字符串。
当然直接调用 skipInitialValue,它是直接调用 subscribeListener 而不会调用 onNext()
RxTextView.textChanges(edittext)
.skipInitialValue()
.debounce(500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
textview.text = text.toString()
}, {
Log.w("test", it.localizedMessage)
})
自定义 View 的 Listener 转换为 Observable
一个自定义 View 的 Listener 有多个方法,可以针对多个方法实现多个 Observable,也可以通过不同的值来判断。
模仿 RxView 的实现
定义一个静态的方法,可以写到一个通用的工具类中去。
@CheckResult
@NonNull
public static InitialValueObservable<Long> timerTextViewProgress(@NonNull TimerTextView view) {
checkNotNull(view, "TimerTextView view == null");
return new TimerTextViewListenerObservable(view);
}
对每个自定义 View 的 Listener 写一个 Observable,在 Listener 变化时,调用 Observer 的 onNext(),
public class TimerTextViewListenerObservable extends InitialValueObservable<Long> {
TimerTextView mTimerTextView;
public TimerTextViewListenerObservable(TimerTextView timerTextView) {
this.mTimerTextView = timerTextView;
}
@Override
protected void subscribeListener(Observer observer) {
TimerTextViewListenerObservable.Listener listener = new TimerTextViewListenerObservable.Listener(mTimerTextView, observer);
observer.onSubscribe(listener);
mTimerTextView.setOnTimerProgressListener(listener);
}
@Override
protected Long getInitialValue() {
return Long.valueOf(Progress.INIT);
}
final static class Listener extends MainThreadDisposable implements TimerTextView.OnTimerProgressListener {
private final TimerTextView view;
private final Observer<Long> observer;
public Listener(TimerTextView view, Observer<Long> observer) {
this.view = view;
this.observer = observer;
}
@Override
public void onTimerStart(TimerTextView timerTextView) {
if (!isDisposed()) {
observer.onNext(Long.valueOf(Progress.START));
}
}
@Override
public void onTimerProgress(TimerTextView timerTextView, long second) {
if (!isDisposed()) {
observer.onNext(second);
}
}
@Override
public void onTimerEnd(TimerTextView timerTextView) {
if (!isDisposed()) {
observer.onNext(Long.valueOf(Progress.END));
}
}
@Override
protected void onDispose() {
view.removeOnTimerProgressListener();
}
}
@IntDef({
Progress.INIT,
Progress.START,
Progress.END
})
@Retention(RetentionPolicy.SOURCE)
public @interface Progress {
int START = -2;
int END = -1;
int INIT = 0;
}
}
RxBinding 案例
控件点击
RxView.clicks(findViewById(R.id.btn_rxbinding_button_click))
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
i++;
LogUtil.i("click...:" + i);
}
});
控件长按
RxView.longClicks(findViewById(R.id.btn_rxbinding_button_click), new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Toast.makeText(RxBindingDemoActivity.this, "rx 长按", Toast.LENGTH_SHORT).show();
}
});
控件多次点击过滤,使用 throttleFirst(2, TimeUnit.SECONDS),2 秒内只发射一次点击事件。两秒钟之内只取一个点击事件,防抖操作
RxView.clicks(findViewById(R.id.btn_start_act_rxbinding))
.throttleFirst(2, TimeUnit.SECONDS)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
startActivity(RxBindingDemoActivity.this, TestMultiClickActivity.class);
}
});
item 长按事件
RxAdapterView.itemLongClicks( listView)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Toast.makeText(ListActivity.this, "item long click " + integer , Toast.LENGTH_SHORT).show();
}
})
EditText 的 textChanged() 变化事件
RxTextView.textChanges(mEtRxbinding)
.subscribe(new Action1<CharSequence>() {
@Override
public void call(CharSequence charSequence) {
Toast.makeText(RxBindingDemoActivity.this, "change:" + charSequence.toString(), Toast.LENGTH_SHORT).show();
}
});
CheckBox 的 onCheckedChanged() 事件
RxCompoundButton.checkedChanges(cbCheckbox)
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
mTvResult.setText("" + aBoolean);
}
});
RxRelay
https://github.com/JakeWharton/RxRelay
没有 onError 和 onComplete 的 Subject
Subjects 是连接 non-Rx APIs 很好的桥梁,但他们接收 onError 或 onComplete 后,就不再接收数据了;
而 RxReplay 是没有 onError 和 onComplete 状态的 Subjects,有 BehaviorRelay、PublishRelay、ReplayRelay。没有 AsyncRelay,因为 RxRelay 没有 onComplete 事件。
BehaviorRelay
同 BehaviorSubject,只是没有 onError 和 onComplete 的 Subject
PublishRelay
同 PublishSubject,只是没有 onError 和 onComplete 的 Subject
ReplayRelay
同 ReplaySubject,只是没有 onError 和 onComplete 的 Subject