我个人认为流这样的数据结构存在主要是为了推迟真实执行计算的时机, 仅在数据真的被需要时才会产生出来, 顺便, 它的存在也解决了计算机中如何描述’无限’的问题, 只要你在从数据源获取数据时总能取到, 那么这个数据源就可以认为是无限的.

流处理应用中, 带给我深刻的印象的算法就是质数筛选问题, 它使用递归的方式定义, 实现得相当简洁.

建议了解car与cdr之后再看这篇.

流的基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
; 流的使用类似于cons, car, cdr, 但是其实现上略不同
(define s (cons-stream 1 (cons-stream 2 nil)))
s
;Value: {1 ...} 该流是以1开头的

; 获取首个数据
(car s)
(stream-car s)
;Value: 1

; 获取第二个数据
(cdr s)
;Value: #[cell 13] 得到的不是流, 而是一个函数体

; 当你真的需要第二个数据时, 可以使用force得到它, 或是直接用stream-cdr的方式
(force (cdr s))
(stream-cdr s)
;Value: {2 ...} 得到以2开头的流

流的实现

从上面的例子可以看到, 流的实现中, car与stream-car的作用完全一致, cdr与stream-cdr的区别 在于调用了一次force, 因此我们只要定义好force就可以了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
; 必须以宏的形式定义, 有兴趣的读者可以看下这里: https://stackoverflow.com/a/14641015/5563477
; cons中, 我们的定义的cdr不是b, 而是一个函数, 这个函数在调用时, 会返回b
(define-syntax my-cons-stream
(syntax-rules ()
((my-cons-stream a b)
(cons a (lambda () b)))))

; force中, 我们给定y之后, 将y当做函数, 并调用
(define (my-force y)
(y)
)
(define head car)
(define (tail s)
(my-force (cdr s))
)
; head和tail就是上面的stream-car与stream-cdr

流的一些操作

下面的代码我没有考虑流会结束, 真实的代码中应该考虑这个问题.

全体正整数组成的流

这个流有个特点, 就是后一项比前一项大1, 因此可以递归的方式进行定义

1
2
3
4
5
6
7
8
9
10
11
12
13
(define (all-num n)
(my-cons-stream
n
(all-num (+ 1 n))
)
)
(define integers (all-num 1))

integers
;Value: (1 . #[compound-procedure 12])

(tail integers)
;Value: (2 . #[compound-procedure 13])

获取流的第N个元素

就是不断的获取流的下一项, 直到得到我们想要的元素

1
2
3
4
5
6
7
8
9
10
(define (my-stream-ref n s)
(if (= n 1)
(head s)
(my-stream-ref (-1+ n) (tail s))
)
)

; 当我们想要第5个元素时
(my-stream-ref 5 integers)
;Value: 5

针对这个流进行操作, 得到新的流

比如我们针对正整数流进行操作, 把其中所有的元素都变成其2倍, 就可以这么来做

1
2
3
4
5
6
7
8
9
10
11
12
13
(define (my-stream-map proc s)
(my-cons-stream
(proc (head s))
(my-stream-map proc (tail s))
)
)

(define zoom
(my-stream-map (lambda(x) (* x 2)) integers)
)
; 放大之后, 第5个数就是10
(my-stream-ref 5 zoom)
;Value: 10

筛选这个流中的数据, 得到新的流

比如我们针对正整数流进行操作, 只取其中的奇数, 就可以这么来做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
(define (my-filter-stream pred s)
(cond
((pred (head s))
(my-cons-stream
(head s)
(my-filter-stream pred (tail s))
)
)
(else
(my-filter-stream pred (tail s))
)
)
)


(define (divisible? x y)
(= (remainder x y) 0)
)

(define odds
(my-filter-stream
(lambda (x) (not (divisible? x 2)))
integers
)
)

(my-stream-ref 5 odds)
;Value: 9 第5个奇数就是9

质数筛选算法

以下内容来自于维基百科: https://zh.wikipedia.org/wiki/%E5%9F%83%E6%8B%89%E6%89%98%E6%96%AF%E7%89%B9%E5%B0%BC%E7%AD%9B%E6%B3%95

20210831235954

20210901000048

简单来说, 如果我们得到了一个质数, 就将后续的整数中, 能整除它的所有数过滤, 这样我们得到的数列就是一个素数的数列, 使用scheme编写时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(define (sieve s)
(my-cons-stream
(head s)
(sieve ; 筛选出后续的整数中不能整除(head s)的数, 将其作为一个流
(my-filter-stream
(lambda (x) (not (divisible? x (head s))))
(tail s))
)
)
)

; 注意, 质数是从2开始的序列, 不是1
(define prime
(sieve (all-num 2))
)

(my-stream-ref 9 prime)
; Value: 23 ; 第9个质数是23

这个递归的实现可以说是完全契合了定义.

总结

SICP中, 介绍流的时候, 老师是从电气工程的角度来看的, 电路信号是波动的, 连续的, 进过不同的整流器可以产生多种变化, 但其仍然是连续的, 这是一种流.

虽然我们的数据结构课上没有讲过它, 但流其实也是一种扩展出的数据结构, 并且它还是一种 控制逻辑. 当你将某些数据用流来表示时, 这意味着程序的其他部分也需要适应这个变化. 它将一段代码真实的执行时间推迟, 允许数据在真的使用时才会产生.

流处理是一个强大的工具, 程序可以全部流的方式来实现, 但是会带来一个问题: 你可能完全不知道一个函数会在什么时候被调用. 因此, 我个人认为这样的数据结构适合于程序的几个部分或是几个程序的交互处, 类似生产者与消费者之间, 尤其适用于多级消费者, 做一个任务就提交能有效避免后续的消费者饿死.

附: JS实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 我就不花里胡哨的去用lambda或是dispatch实现了, 二元数组就够了
// 这段代码是无法运行的, all_num定义时就会报错, cons-stream的定义需要依赖宏
// 但是JS中没有宏..
// stream_cons = function(x, y) {
// return [x, () =>y];
// }
// all_num = function(n) {
// return stream_cons(n,all_num(n+1));
// }
// all_num(3);

let stream_cons, stream_car, stream_cdr, all_num, stream_ref, stream_map, stream_filter;

stream_cons = function(x, y) {
if (typeof(y) === 'function'){
return [x, y];
} else {
return [x, () =>y];
}
}
stream_car = function(s) {
return s[0];
}
stream_cdr = function(s) {
return s[1]();
}

all_num = function(n) {
return stream_cons(
n,
() => all_num(n+1) //NOTE: 这里需要显式的延迟
);
}

stream_ref = function(N, s) {
if (N==1) return stream_car(s);
return stream_ref(N-1, stream_cdr(s));
}

integers = all_num(1);
stream_ref(5, integers);
// 5

stream_map = function(proc, s) {
return stream_cons(
proc(stream_car(s)),
() => stream_map(proc, stream_cdr(s)) // 这里需要显式的延迟
)
}
zoom = stream_map((x)=>x*2, integers);
stream_ref(5, zoom); // 10

stream_filter = function(pred, s) {
if(pred(stream_car(s))) {
return stream_cons(
stream_car(s),
() => stream_filter(pred, stream_cdr(s))
);
}
return stream_filter(pred, stream_cdr(s));
}
odds = stream_filter((x) => x%2==1, integers);
stream_ref(7, odds); // 13


sieve = function(s) {
return stream_cons(
stream_car(s),
() => sieve( // 显示的延迟
stream_filter((x) => !(x % (stream_car(s))==0), stream_cdr(s))
)
);
}
prime = sieve(all_num(2));
stream_ref(9, prime);
// 23

for(var i=1; i<=9; i++) {
console.log(stream_ref(i,prime));
}