Hot questions for Using EventBus in rx android

Question:

I'm trying to replace EventBus with RxAndroid.

I want pageable fragments to subscribe/unsubscribe to an event source, these fragments get created and discarded relatively quickly, depending on how fast the user slides to a new page.

In EventBus I was able to add an decorated callback method (ie @Subscribe(threadMode = ThreadMode.MAIN)) and register/unregister in the onStart/onStop methods of the fragment.

With RxJava2 I now create a PublishSubject object in a class

public static PublishSubject<List<Long>> m_psUpdatedDays = PublishSubject.create();
public static void publishUpdatedDays(List<Long> lDay) {
    m_psUpdatedDays.onNext(lDay);
}

and subscribe to this publisher in another class by calling the following in the Fragment's onStart method:

m_psUpdatedDays.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Long>>() {
    @Override public void onSubscribe(Disposable d) {}
    @Override public void onNext(List<Long> longs) {
      ...
      Update Fragment UI here
      ...
    }
    @Override public void onError(Throwable e) {}
    @Override public void onComplete() {}
});

My question is how can I unsubscribe this new Observer when the Fragment's onStop method is called by the system?

Do I need to store the Disposable object which I get in the onSubscribe and then call .dispose() on it in the onStop method?


Answer:

You can make use of a CompositeDisposable object, which can keep a list of disposables and all of them can be disposed together.

Create a CompositeDisposable instance in the base fragment level, keep on adding your disposables into it.

    public abstract class BaseFragment extends Fragment {
        protected CompositeDisposable mCompositeDisposable = new CompositeDisposable();

        @Override
        public void onPause() {
            super.onPause();
            mCompositeDisposable.clear();
            //clear will clear all, but can accept new disposable.
            // You can call it on `onPause` or `orDestroyView` events.
        }

        @Override
        public void onDestroy() {
            super.onDestroy();
            mCompositeDisposable.dispose();
            //dispose will clear all and set isDisposed = true, so it will not accept any new disposable
        }
    }

In your fragments, subscribe to the Observable using the subscribeWith method, which gives you a disposable instantly and this disposable you can dispose later in the onPause or onDestroy events (wherever you want)

     public class MyFragment extends BaseFragment {


            @Override
            public void onStart() {
                super.onStart();
                Disposable disposable = m_psUpdatedDays.observeOn(AndroidSchedulers.mainThread())
                        .subscribeWith(new DisposableObserver<List<Long>>() { // Use `subscribeWith` instead of `subscribe`, which will give you back the disposable , which can be disposed later
                            @Override
                            public void onNext(List<Long> longs) {
                                // Update Fragment UI here
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {

                            }
                        });
                mCompositeDisposable.add(disposable); // add the disposable to the disposable list
            }
        }

Question:

I'm implementing an event bus (RxBus) with RxJava.

RxBus.java

public class RxBus {

    private static final String TAG = LogUtils.makeTag(RxBus.class);
    private static final RxBus INSTANCE = new RxBus();

    private final Subject<Object, Object> mBusSubject = new SerializedSubject<>(PublishSubject.create());

    public static RxBus getInstance() {
        return INSTANCE;
    }

    public <T> Subscription register(final Class<T> eventClass, Action1<T> onNext) {
        return mBusSubject
                .filter(new Func1<Object, Boolean>() {
                    @Override
                    public Boolean call(Object event) {
                        return event.getClass().equals(eventClass);
                    }
                })
//                .filter(event -> event.getClass().equals(eventClass))
                .map(new Func1<Object, T>() {
                    @Override
                    public T call(Object obj) {
                        return (T) obj;
                    }
                })
//                .map(obj -> (T) obj)
                .subscribe(onNext);
    }

    public void post(Object event) {
        Log.d(TAG, "Apr12, " + "post event: " + event);
        mBusSubject.onNext(event);
    }
}

Post an event from viewHolder of a RecyclerView

public ViewHolder(LayoutInflater inflater, final ViewGroup parent) {
        super(inflater.inflate(R.layout.bill_card, parent, false));

        drawee = (SimpleDraweeView) itemView.findViewById(R.id.card_image);
        title = (TextView) itemView.findViewById(R.id.card_title);

        itemView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {

                Log.d(TAG, "Apr12, item clicked.");
                RxBus.getInstance().post(new ItemSelectedEvent(position));
            }
        });

        TagImageButton = (ImageButton) itemView.findViewById(R.id.tag_button);
        TagImageButton.setOnClickListener(new View.OnClickListener(){
            @Override
            public void onClick(View v) {
                Log.d(TAG, "Tag button clicked.");
                RxBus.getInstance().post(new ApplyTagForItemEvent(position));
            }
        });
    }
}

Subscribe the events from Fragment

@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
    super.onActivityCreated(savedInstanceState);

    mActivity = getActivity();
    Log.d(TAG, "getActivity(): " + getActivity());
    mItemClickSubscription = RxBus.getInstance().register(ItemSelectedEvent.class,
            new Action1<ItemSelectedEvent>() {
                @Override
                public void call(ItemSelectedEvent event) {
                    Log.d(TAG, "Apr12, " + "call event: " + event);
                    if (mDetail == null) {
                        if (getParentFragment() instanceof IFragmentStackHolder) {
                            IFragmentStackHolder fsh = (IFragmentStackHolder) getParentFragment();

                            Fragment details = new DetailCardFragment();
                            Bundle args = new Bundle();
                            args.putInt(ContentHolder.INDEX, event.getPosition());
                            details.setArguments(args);

                            fsh.pushFragment(details, event.getPairs());
                        }
                    }
                }
            });

    mApplyTagSubscription = RxBus.getInstance().register(ApplyTagForItemEvent.class,
            new Action1<ApplyTagForItemEvent>() {
                @Override
                public void call(ApplyTagForItemEvent event) {
                    IFragmentStackHolder fsh = (IFragmentStackHolder) getParentFragment();

                    Fragment tagApplyFragment = new TagApplyFragment();
                    Bundle args = new Bundle();
                    args.putInt(ContentHolder.INDEX, event.getPosition());
                    tagApplyFragment.setArguments(args);

                    fsh.pushFragment(tagApplyFragment, null);
                }
            }
    );
}

The problem is: when I click on itemView or TagImageButton, RxBus.post() is only called once (which is correct), but Action1 call() is called multiple times (not even constant times). Please see the log below.

D/**-CardContentView(31177): Apr12, item clicked.
D/**-RxBus(31177): Apr12, post event: com.*****.events.ItemSelectedEvent@1a11346e
D/**-CardDetailFragment(31177): Apr12, call event: com.*****.events.ItemSelectedEvent@1a11346e
D/**-CardDetailFragment(31177): Apr12, call event: com.*****.events.ItemSelectedEvent@1a11346e
D/**-CardDetailFragment(31177): Apr12, call event: com.*****.events.ItemSelectedEvent@1a11346e

How can I make it called only once?

EDIT: I found that if Action1 call() is called N times this time, it will be called N+1 times next time when I click on the item. It seems that the the observable is emitting all subsequently observed items in the history to the subscriber.


Answer:

Finally find the solution.

Very simple: I should have called mItemClickSubscription.unsubscribe(); and mApplyTagSubscription.unsubscribe(); in onStop().

PublishSubject is used in the event bus. PublishSubject is a subject:

Subject that, once an Observer has subscribed, emits all subsequently observed items to the subscriber.

So if you don't unsubscribe() the subscription, this subscription will keep "recording" all the events happened in the history, and emit all of them once .subscribe(onNext) is executed.