UniRx初探

背景

一个unity项目中需要定义一个按钮的双击响应,
为了不引入更多变量,在网上找到了适用于Unity的响应式编程库,UniRx.
项目最近更新在2019年,大有不再维护的趋势.

简介

UniRx是一个为Unity而编写的响应式编程的框架.
正如在js上实现的rxjs.

在编程的思想上,实现了丰富的响应式编程的方法,直接使用可以减少许多逻辑性的代码.

  1. 核心的subscribe功能

  2. 丰富的操作符

    1. First 只接收一次
    2. Where (类似rxjs的filter功能)
    3. WhenAll 当多个条件都满足时

    等等

功能上,也为Unity开发中常用到的一些工具提供了支持

  1. 集成UGUI,方便按钮绑定事件等操作
  2. 集成WWW,方便网络请求
  3. 自带一个消息机制,方便不同组件之间的解偶

此外,还解决了一些其他的痛点

  1. 方便了生命周期管理,绑定的函数可以方便地取消
  2. 可以实现一个比较简单实用的MVP架构

关于该库的发展方面,
目前还看不出UniRx有类似rxjs那样,从链式调用改为pipe处理的苗头.
也发现ObservableWWW不被Unity官方所推荐,也还没有跟进Unity的UIElements.
要么是项目不准备维护了,要么是UniRx不认同Unity的发展路线.

引入

官方release里有unitypackage,下载后导入unity即可.
import assets > custom package

卖点介绍

集成UGUI事件

  1. 基础使用

    总得来说就是一切皆可 AsObservable,然后就能利用一些流的特有方法(节流,统计,过滤等等)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 一切皆可接收的 AsObservable
    MyButton.onClick
    .AsObservable()
    .Subscribe(_ => print("clicked"));

    // 除此之外可能还会针对不同的UGUI组件定义不同的快捷方式,不过看起来没快捷多少
    // Button
    MyButton.OnClickAsObservable().Subscribe(_ => print("clicked"));

    // Toggle
    MyToggle.OnValueChangedAsObservable().Subscribe(_ => print("toggle changed"));

    // InputField
    inputField.OnEndEditAsObservable().Subscribe(_ => {
    print(inputField.text);
    });
  2. 生命周期

    原版的UGUI事件有些不好用,比如按钮的点击事件,注册后就不大好取消了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class TestController : MonoBehaviour {
    private Button _button;

    private void Start() {
    _button = GetComponent<Button>();

    _button.onClick.AddListener(() => {
    print("inline 1");
    print("inline 2");
    }); // 匿名的关系无法取消

    // 带参数的函数必须用delegate,匿名的关系无法取消
    _button.onClick.AddListener(delegate { WithPara(_button.IsActive()); });

    // 不带参数的,可以用函数名定义行为,可以取消
    _button.onClick.AddListener(NoPara);
    _button.onClick.RemoveListener(NoPara);

    // 不能精准控制,只有全部取消
    _button.onClick.RemoveAllListeners();
    }

    private void WithPara(bool isactive) {
    print("outside basic, parameter is: " + isactive);
    }

    private void NoPara() {
    print("outside short");
    }
    }

    如果使用UniRx,那么事情就变得简单了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class RxController : MonoBehaviour {
    private Button _button;
    private CompositeDisposable _disposable;

    private void Start() {
    _button = GetComponent<Button>();
    _disposable = new CompositeDisposable();

    // 方式1: 手动取消
    IDisposable subscription = _button.OnClickAsObservable().Subscribe(_ => {
    print("1");
    print("2");
    });
    subscription.Dispose();

    // 方式二: 绑定一个列表以取消
    _button.OnClickAsObservable().Subscribe(_ => WithPara(_button.IsActive())).AddTo(_disposable);
    // 当该列表清空时,列表内每个元素关联的subscription都会取消
    _disposable.Clear();

    // 方式三: 绑定到该类
    _button.OnClickAsObservable().Subscribe(_ => NoPara()).AddTo(this);

    // 当然默认也可以不写
    }
    }
  3. 实现双击检测

    1
    2
    3
    4
    var clickStream = MyButton.OnClickAsObservable();
    clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMillisecends(250)))
    .Where(xs => xs.Count >= 2)
    .Subscribe(xs => Debug.Log("count:" + xs.Count));
  4. 一段时间内仅仅第一次点击生效

    1
    2
    3
    IObservable<Unit> clickStream = _button.OnClickAsObservable();
    // 3秒内只有第一次点击生效
    clickStream.ThrottleFirst(TimeSpan.FromSeconds(3)).Subscribe(_ => print("clicked"));
  5. 绑定到text显示

    1
    2
    3
    4
    5
    6
    _input = GetComponent<InputField>();
    _text = GameObject.Find("Text").GetComponent<Text>();
    // 正常方法
    _input.OnValueChangedAsObservable().Subscribe(val => _text.text = val);
    // 快捷方式
    _input.OnValueChangedAsObservable().SubscribeToText(_text);

与www的集成

原版的网络请求用起来可能会费劲一些,需要写IEnumerator等一大篇业务无关的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void Simple() {
StartCoroutine(GetRequest("https://www.google.com"));
}

private IEnumerator GetRequest(string uri) {
using (UnityWebRequest webRequest = UnityWebRequest.Get(uri)) {
// Request and wait for the desired page.
yield return webRequest.SendWebRequest();

if (webRequest.isNetworkError) {
Debug.Log("Error: " + webRequest.error);
} else {
Debug.Log("Received: " + webRequest.downloadHandler.text.Substring(0, 100));
}
}
}

如果用UniRx的 ObservableWWW,在写法上会方便许多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 一个普通的请求
private void Func1() {
ObservableWWW.Get("http://google.co.jp/")
.Subscribe(
x => Debug.Log(x.Substring(0, 100)), // onSuccess
ex => Debug.LogException(ex)); // onError
}

// 像linq一样的写法,构造对象非常自由
private void Func2() {
var query =
from google in ObservableWWW.Get("http://google.com/")
from bing in ObservableWWW.Get("http://bing.com/")
select new {
google = google.Substring(0, 20),
bing = bing.Substring(0, 20)
};

IDisposable cancel = query.Subscribe(
x => Debug.Log(x),
ex => Debug.LogException(ex)
);

// Call Dispose is cancel.
// cancel.Dispose();
}

// 并行请求3个地址,在完成之后打印结果
private void Func3() {
IObservable<string[]> parallel = Observable.WhenAll(
ObservableWWW.Get("http://google.com/"),
ObservableWWW.Get("http://bing.com/"),
ObservableWWW.Get("http://unity3d.com/")
);

parallel.Subscribe(xs => {
Debug.Log(xs[0].Substring(0, 100)); // google
Debug.Log(xs[1].Substring(0, 100)); // bing
Debug.Log(xs[2].Substring(0, 100)); // unity
});
}

但现在Unity会警告说 ObservableWWW 已经落伍,建议用他们丝毫没有封装的 UnityWebRequest.
那也没有办法,但如果还想让网络请求变成流式,可以按照如下方法包装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 中层将返回字符的请求过程封装起来,并变成一个Observable.
private IObservable<string> GetTextureObservable(string uri) {
return Observable.Defer(() => GetTextAsync(uri).ToObservable());
}


private async UniTask<string> GetTextAsync(string uri) {
var uwr = UnityWebRequest.Get(uri);

// 底层使用UniTask的await来写异步代码
// 并使用官方的UnityWebRequest工具
await uwr.SendWebRequest();

if (uwr.isHttpError || uwr.isNetworkError)
{
// 带上处理异常的逻辑
throw new Exception(uwr.error);
}

// 返回的是字符
return uwr.downloadHandler.text;
}

多线程能力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
IObservable<int> heavyMethod1 = Observable.Start(() => {
// 耗时处理1
Thread.Sleep(TimeSpan.FromSeconds(1));
return 10;
});

IObservable<int> heavyMethod2 = Observable.Start(() => {
// 耗时处理2
Thread.Sleep(TimeSpan.FromSeconds(3));
return 10;
});

// 等待
Observable.WhenAll(heavyMethod1, heavyMethod2)
.ObserveOnMainThread() // 等待两个处理都完成后返回主线程
.Subscribe(xs => {
// unity只有在主线程上才能操作UI元素
GameObject.Find("Text").GetComponent<Text>().text = "complete";
});

新的消息机制

  1. 单方面发送消息

    消息类

    1
    2
    3
    public class TestArgs {
    public int Value { get; set; }
    }

    发送消息

    1
    2
    3
    4
    // 放在任何地方都可以,Start只是个例子
    private void Start() {
    MessageBroker.Default.Publish(new TestArgs {Value = 1000});
    }

    接收消息

    1
    2
    // 放在任何地方都可以
    MessageBroker.Default.Receive<TestArgs>().Subscribe(x => Debug.Log(x.Value));
  2. 接收到消息后需要回复

    发送消息

    1
    2
    AsyncMessageBroker.Default.PublishAsync(new TestArgs {Value = 3000})  // 发送消息
    .Subscribe(_ => print("all observer completed")); // 所有观察者都做出响应之后

    接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    AsyncMessageBroker.Default.Subscribe<TestArgs>(x => {
    // 收到信息后立即处理x
    print(x.Value);

    // 收到信息后不着急处理,等一会儿
    return Observable.Timer(TimeSpan.FromSeconds(1))
    .ForEachAsync(_ => print(x.Value));
    });

带来的一些新结构

Unity没有提供View和ModelView双向绑定的机制,
如果自行实现可能会非常浪费性能,因此大多数使用MVP的结构.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 一种可行的想法是,
// 一个Scene中的所有物品都可以是View
// 挂载在其上的脚本作为Presenter.
public class ReactivePresenter : MonoBehaviour {
// Presenter与View双向通信,也与Model双向通信
// 因此这两个的引用,Presenter都要有
private Button MyButton;
private Toggle MyToggle;
Enemy enemy = new Enemy(1000);

void Start() {
// 通常在初始化时,关联Presenter与View
MyButton = GameObject.Find("MyButton").GetComponent<Button>();
MyToggle = GameObject.Find("MyToggle").GetComponent<Toggle>();

// Presenter一方面要接收View的事件
MyButton.OnClickAsObservable().Subscribe(_ => enemy.CurrentHp.Value -= 99);
MyToggle.OnValueChangedAsObservable().SubscribeToInteractable(MyButton);

// 另一方面Presenter也要接收Model传来的事件做出响应
enemy.CurrentHp.SubscribeToText(MyText); // 此处CurrentHp是响应式的成员,因此可以使用SubscribeToText
enemy.IsDead.Where(isDead => isDead == true)
.Subscribe(_ => {
// 订阅Model的事件以操作View.
MyToggle.interactable = MyButton.interactable = false;
});
}
}


public class Enemy {
public ReactiveProperty<long> CurrentHp { get; private set; }
public ReactiveProperty<bool> IsDead { get; private set; }

public Enemy(int initialHp) {
CurrentHp = new ReactiveProperty<long>(initialHp);
// 通常一个Model会有自己的一些方法
// 比如此处的IsDead借助响应式的思想,暗含了一个血量变成0就死掉的方法.
IsDead = CurrentHp.Select(x => x <= 0).ToReactiveProperty();
}
}

在UIElements上的扩展

Plugins/UniRx/Scripts/UnityEngineBridge/UnityUIComponentExtensions.cs 中,
随便找个地方添加

1
2
3
4
5
6
7
public static IObservable<Unit> OnClickAsObservable(this UnityEngine.UIElements.Button source)
{
return Observable.FromEvent(
h => h,
h => source.clickable.clicked += h,
h => source.clickable.clicked -= h);
}

即可实现对UIElements中Button的扩展,其他也同理.
当然以后也可以考虑令起炉灶,放在 UIElementsExtensions.cs 中.

另外,此处的this关键字表示这个方法定义在后面的Button类上,之后Button的对象就可以使用该方法了

1
2
3
button.OnClickAsObservable(_ => {
print("clicked");
});

在底层调用的是
Assets/Plugins/UniRx/Scripts/Observable.Events.cs 里面的

1
2
3
4
5
6
public static IObservable<Unit> FromEvent<TDelegate>
(Func<Action, TDelegate> conversion,
Action<TDelegate> addHandler,
Action<TDelegate> removeHandler) {
return new FromEventObservable<TDelegate>(conversion, addHandler, removeHandler);
}

在VideoPlayer上的扩展

VideoPlayer 对象在视频结束后添加回调函数的方式上
官方默认使用的是

1
2
3
4
5
6
7
8
9
private void Start() {
// ...
player.loopPointReached += SomeMethod;
}


private void SomeMethod(VideoPlayer player) {
player.playbackSpeed = player.playbackSpeed / 10.0f;
}

如果想将该回调事件传向Stream中,需要定义函数如下

1
2
3
4
5
6
7
8
9
public static IObservable<UnityEngine.Video.VideoPlayer>
OnLoopPointReachedAsObservable(this UnityEngine.Video.VideoPlayer player) {
return Observable
.FromEvent<UnityEngine.Video.VideoPlayer.EventHandler, UnityEngine.Video.VideoPlayer>
(
h => (e) => h(e),
h => player.loopPointReached += h,
h => player.loopPointReached -= h);
}

可以和UIElements的扩展放在一起.
在底层调用的是

1
2
3
4
5
6
public static IObservable<TEventArgs> FromEvent<TDelegate, TEventArgs>
(Func<Action<TEventArgs>, TDelegate> conversion,
Action<TDelegate> addHandler,
Action<TDelegate> removeHandler) {
return new FromEventObservable<TDelegate, TEventArgs>(conversion, addHandler, removeHandler);
}

可以想象, loopPointReached 后接的函数需要以 VideoPlayer 为入参类型,
那么表达在Stream的风格中,就应该是要建立一个返回 Observable<VideoPlayer> 的函数,
因此选 IObservable<TEventArgs> 作为底层函数的返回值类型.

而关于 UIElements 为什么可以选 IObservable<Unit>, 参考 Unit 类型在kotlin中的使用,
可以理解为 () 这个值,其实就是函数没有形参列表.
代表着可以使用 Subscribe( _ => xxx;) 而不是 Subscribe((VideoPlayer player) => player.xxx)

参考

  1. 官方其实给出了特别多的用例
  2. 如今的Unity中如何做网络请求
  3. 在UIElements上的扩展方法