HOME BLOG

emacs generator 使用及实现介绍

本文简单介绍了 emacs 中的 generator 的使用方法,并分析了它的实现方式。由于 generator.el 用到了 CPS,本文也就不可避免地涉及到一些 CPS 相关的知识,希望对 CPS 迷惑过的读者能通过本文理解它。

本文假设读者对 Scheme 或 Racket 有一些简单的了解,我可能会使用它们来举一些非常简单的例子。只想了解 generator 用法的读者只需要阅读本文的前两节。如果读者想要完全看完这篇文章,可能需要以下知识:

希望经过诸多 filter 你还能继续看下去(笑)。本文的 3.2, 3.3, 3.4, 3.5 是可选的,不读也不会影响对文章内容的理解。

本文使用的环境如下:

1. 什么是 generator

talk is cheap, show me the code – Linus Torvalds

废话少说,放码过来~

1.1. 简单的 generator 例子

下面是两个 generator 的例子,它们分别使用了 Python 和 JavaScript,算是我比较熟悉的两门语言了:

def fib():
    a = 0
    b = 1
    while True:
        a, b = b, a + b
        yield a

a = fib()

for i in range(0, 9):
    print(next(a))
# 1 1 2 3 4 8 13 21 34 (with newlines)

下面是一个简单的 JS generator 例子,摘自 MDN 文档:

const foo = function*() {
  yield 'a';
  yield 'b';
  yield 'c';
};

a = foo()

a.next()
// {value: 'a', done: false}
a.next()
// {value: 'b', done: false}
a.next()
// {value: 'c', done: false}
a.next()
// {value: undefined, done: true}

let str = '';
for (const val of foo()) {
    str = str + val;
}

console.log(str);
// Expected output: "abc"

不管是 Py 还是 JS,在上面的代码中都使用了 yield 关键字,它们的作用看上去和 return 没什么区别,不过比较有意思的是在 next 调用“返回”后,caller 再次对对象调用 next 时还能够回到 yield 表达式的位置并继续执行。这上面的函数( fibfoo )叫做 generator (也就是生成器)。

当然了,学过 Py 和 JS 的你应该知道 generater 只是一种 iterator (即迭代器)的方便写法罢了,上面的代码我们可以轻易使用 iterator 来实现:

class fib:
    def __init__(self):
        self.a = 0
        self.b = 1
    def __iter__(self):
        return self
    def __next__(self):
        self.a, self.b = self.b, self.a+self.b
        return self.a

a = fib()

for i in range(0, 9):
    print(next(a))

在上面这个例子中,我们在对象中保存了当前 ab 的值,在下一次 next 调用时就可以使用它们,这样就让 fib 的求取 继续 了下去。相比于自己手动构建 iterator,使用 generator 的好处是不用 显式 记录当前状态供下次调用使用,直接使用 yield 即可。以下是一些语言文档中对 generator 的描述:

A generator is a function that produces a potentially-infinite stream of values. Each time the function produces a value, it suspends itself and waits for a caller to request the next value.

Elisp manual 11.6 Generators

A function which returns a generator iterator. It looks like a normal function except that it contains yield expressions for producing a series of values usable in a for-loop or that can be retrieved one at a time with the next() function.

Usually refers to a generator function, but may refer to a generator iterator in some contexts. In cases where the intended meaning isn’t clear, using the full terms avoids ambiguity.

Python documentation generator

Generator functions provide a powerful alternative: they allow you to define an iterative algorithm by writing a single function whose execution is not continuous.

When called, generator functions do not initially execute their code. Instead, they return a special type of iterator, called a Generator. When a value is consumed by calling the generator's next method, the Generator function executes until it encounters the yield keyword.

MDN Generator functions

从以上描述来看,elisp 强调了可由 generator 生成无穷流,而且 generator 在执行到某一点时会挂起并等待 caller 的下一次请求。Python 强调的是可通过 yield 得到有用的可用于循环的序列,而 JS 中则表示我们可以使用 generator 方便地实现一些迭代算法。总结一下的话就是:generator 允许我们 保留某一点的执行状态 ,并在需要时返回以继续执行。

维基百科中对 generator 的定义如下:

In computer science, a generator is a routine that can be used to control the iteration behaviour of a loop.

Generators can be implemented in terms of more expressive control flow constructs, such as coroutines or first-class continuations. Generators, also known as semicoroutines, are a special case of (and weaker than) coroutines, in that they always yield control back to the caller (when passing a value back), rather than specifying a coroutine to jump to.

Generator (computer programming) - Wikipedia

这段引文中将 generator 称为 semicoroutine(半协程),即认为 generator 是一种特殊(弱化的)coroutine,因为它只能将控制权返还给 caller 而不是其他 coroutine(因此也叫做非对称协程)。同时它也说明了 generator 可由 coroutine 和 first-class continuation 来实现。由于能力精力所限,这里我就不介绍什么是 coroutine 了,至于 first-class continuation 也在文章的开头假设读者已经掌握了,如果读者对这些概念感兴趣的话,下面的一些文章可供阅读:

1.2. 小练习:把 generator 改写为 iterator

在上面我们使用 Python 的 iterator 和 generator 分别实现了 fib 的计算,很容易看出 generator 中的循环变成了 iterator 中对 next 的反复调用,原本隐含在函数中的 a b 变量成了 iterator 中的成员。下面我用几个小例子来介绍如何将 generator 变成 iterator,答案我已经给出来了,不过答案不唯一就是了。读者如果有兴趣可以自己试一试:

1.2.1.

多个 yield

def hello_1 (a):
    if a:
        yield a
        print ('true')
    else:
        yield a
        print ('false')

    b = 1 + 1
    yield b
    c = b + 1
    yield c
answer1
class hello1:
    def __init__(self, a):
        self.a = a
        self.b = 0
        self.c = 0
        self.state = 0
    def __iter__(self):
        return self

    def __next__(self):
        if self.state == 0:
            self.state = 1
            return self.a
        elif self.state == 1:
            if self.a is not False:
                print('true')
            else:
                print('false')
            self.b = 1 + 1
            self.state = 2
            return self.b
        elif self.state == 2:
            self.c = self.b + 1
            self.state = 3
            return self.c
        else:
            raise StopIteration

a = hello_1(True)
b = hello1(True)

c = hello_1(False)
d = hello1(False)

print(list(a))
print(list(b))
print(list(c))
print(list(d))

因为要多次返回,这里我使用了类似 C 语言中的 switch 语句将代码“切”成了一段一段,然后根据状态变量 self.state 来决定要执行哪一段。

1.2.2.

经典的 fact 函数,只不过这里使用 yield 来检查循环条件值:

def fact(n):
    prod = 1
    while n > 0:
        yield n
        prod = prod * n
        n = n - 1
    yield prod
answer2
class fact1:
    def __init__(self, n):
        self.n = n
        self.prod = 1
        self.state = 0
    def __iter__(self):
        return self
    def __next__(self):
        if self.state == 0:
            if self.n > 0:
                self.state = 1
                return self.n
            else:
                self.state = 2
                return self.prod
        elif self.state == 1:
            self.prod = self.prod * self.n
            self.n = self.n - 1
            if self.n > 0:
                self.state = 1
                return self.n
            else:
                self.state = 2
                return self.prod
        else:
            raise StopIteration

a = fact(10)
b = fact1(10)
print(list(a))
print(list(b))

1.2.3.

上面我们使用类 switch 的方法来选择执行的代码,现在让我们换一种写法,用函数而不是状态变量来表达状态转移:

def branch():
    a = (yield)
    if a:
        b = (yield)
        if b:
            yield 1
        else:
            yield 2
    else:
        yield 3

def runit(c1, c2):
    a = branch()
    b = []
    b.append(a.send(None))
    b.append(a.send(c1))
    if c1 is True:
        b.append(a.send(c2))
    print(b)

runit(True, True)
runit(True, False)
runit(False, None)

# [None, None, 1]
# [None, None, 2]
# [None, None, 3]

对于上面的代码,由于不能通过 next 向 iterator 传递参数,我们需要定义自己的迭代函数而不是 __next__ 来接受额外的参数,不过这样就用不了 next 了。

answer3
class branch1:
    def __init__(self):
        self.curval = False
        self.func = self.init_fun
        self.a = None
        self.b = None

    def init_fun(self):
        self.func = self.fun_0
        return
    def fun_0(self):
        self.func = self.fun_1
        self.a = self.curval
        if self.a is True:
            self.func = self.fun_1
            return
        else:
            self.func = self.endfun
            return 3
    def fun_1(self):
        self.b = self.curval
        self.func = self.endfun
        if self.b is True:
            return 1
        else:
            return 2
    def endfun(self):
        raise StopIteration
    def mynext(self, v=None):
        if v is not None:
            self.curval = v
        return self.func()

    def runit(c1, c2):
    a = branch1()
    b = []
    b.append(a.mynext())
    b.append(a.mynext(c1))
    if c1 is True:
        b.append(a.mynext(c2))
    print(b)

runit(True, True)
runit(True, False)
runit(False, None)

不论是采用状态变量的方法还是函数调用的方法,在某个状态/函数的末尾我们都存储了 下一步要干什么 的信息,它可以是一个对象内可见的状态变量,也可以是一个将要调用的函数对象。不管是哪一种方式,我们都实现了 状态的保存与转移 (这里的保存指的是存储在对象成员中的数据)。

这个 下一步干什么 有个正式的名字,那就是 continuation。通过对 generator 去糖,我们将它显式地暴露了出来。那么把这个 continuation 揪出来的目的是什么呢?那自然是通过它来 控制 代码的执行流程,当 iterator 执行到表达式中的 yield 处时,我们获得了这一点的 continuation 并返回到 caller,caller 下次调用 iterator 时我们能直接回到这一点。

2. emacs generator 的使用

整个 generator 文档只有一页,就使用上来说 emacs 的 generator 并不复杂,只是几个函数和宏而已。

我们可以使用 iter-defun 来定义 generator,它的用法就像普通的 defun 一样,只不过调用它并不会直接对 body 求值,而是返回一个 iterator 对象。通过对这个 iterator 调用 iter-next 我们可以获取它其中的值,在 body 结束后,再次调用 iter-next 时会触发 iter-end-of-sequence 信号,来表示 iterator 结束了。

(setq lexical-binding t)
(iter-defun foo123 ()
  (iter-yield 1)
  (iter-yield 2)
  (iter-yield 3))

(setq a (foo123))
(iter-next a) => 1
(iter-next a) => 2
(iter-next a) => 3
(iter-next a) => Debugger entered--Lisp error: (iter-end-of-sequence)

(iter-defun foo+1 ()
  (let ((a (iter-yield 1)))
    (iter-yield (+ a 1))))

(setq b (foo+1))

(iter-next b) => 1
(iter-next b 30) => 31
(iter-next b) => Debugger entered--Lisp error: (iter-end-of-sequence)

注意上面的 lexical-binding ,只有在开启词法作用域的情况下才能够使用 generator。除了使用 iter-defun 外,我们也可以使用 iter-lambda 来创建匿名 generator:

(setq lexical-binding t)
(setq a (funcall (iter-lambda ()
                   (iter-yield 1)
                   (iter-yield 2)
                   (iter-yield 3))))

(iter-next a) => 1
;;...

代码示例中的 iter-yield 和上一节中的 yield 关键字功能类似,都是从当前点回到调用处,没什么好说的。不过 generator.el 还提供了一个叫做 iter-yield-from 的宏,它可以从其他 iterator 获取迭代值来作为当前迭代器的 yield 值:

(setq lexical-binding t)
(setq a (iter-lambda ()
          (iter-yield 1)
          (iter-yield 2)
          (iter-yield 3)))

(setq b (iter-lambda (it)
          (iter-yield 4)
          (iter-yield-from it)
          (iter-yield 5)))

(setq c (funcall a))
(setq d (funcall b c))

(iter-next d) => 4
(iter-next d) => 1
(iter-next d) => 2
(iter-next d) => 3
(iter-next d) => 5
(iter-next d) => Debugger entered--Lisp error: (iter-end-of-sequence)

如果我们要手动调用 iter-next 的话,那么我们也需要处理 iter-end-of-sequence 这个 signal。generator.el 为我们提供了一个 iter-do 宏,帮我们进行了处理:

(setq lexical-binding t)
(setq a (iter-lambda ()
          (iter-yield 1)
          (iter-yield 2)
          (iter-yield 3)))

(setq b nil)
(iter-do (i (funcall a))
  (push i b))
b => (3 2 1)

参考文档,如果我们想手动处理这个 signal 的话,我们可以这样做:

(setq lexical-binding t)
(setq a (iter-lambda ()
          (iter-yield 1)
          (iter-yield 2)
          (iter-yield 3)
          4))

(setq flag t)
(setq it (funcall a))
(while flag
  (condition-case x
      (iter-next it)
    (iter-end-of-sequence
     (progn (print x)
            (setq flag nil)))))
=> print (iter-end-of-sequence . 4)

当然,我们也可以使用 cl-loop 中的 iter-by 关键字来处理 iterator:

(setq lexical-binding t)
(setq a (iter-lambda ()
          (iter-yield 1)
          (iter-yield 2)
          (iter-yield 3)))

(cl-loop for n iter-by (funcall a)
         collect n)
=> (1 2 3)

最后值得一说的是 iter-close 这个函数,根据文档的描述,如果 iterator 在一个 unwind-protect 挂起且变得不可达(unreachable,指没法指向它了),那么 emacs 会在一次 gc 后执行 unwind 的 handler 部分。为了让 handler 在 gc 之前执行,我们可以使用 iter-close 关闭 iterator。文档中还提到在 unwind-protectunwindforms 部分使用 iter-yield 是不行的。

3. 一个简单的 CPS 变换实现

在上面的 Python generator 转 iterator 小练习中我们已经尝试了比较简单的手动 CPS 变换,这里我打算写一个非常简单的 CPS 变换函数来帮助读者了解 CPS 变换是如何进行的,同时也是为下一节介绍 generator.el 的实现做准备。

3.1. 更加“函数式”的 CPS 变换

简单起见,这里我就使用了“代码即数据”的 Scheme 语言。与命令式的赋值不同,我们可以把 continuation 作为函数的一个参数,在函数完成它的执行后的末尾来调用这个 continuation,就像这样:

;; scheme/racket
;; calculate 1 + 1 and * 2
;; (+ 1 1) => 2
;; (* 2 2) => 4

(define (+* a b k) ;k is continuation
  (k (+ a b)))

(define (** a b k)
  (k (* a b)))

;; origin
(* 2 (+ 1 1))

;; imperative
(let ((val '())
      (state '())
      (fun1 '()))
  (set! fun1 (lambda ()
               (set! val (* val 2))
               (set! state #f)))
  (set! state (lambda ()
                (set! val (+ 1 1))
                (set! state fun1)))
  (let loop ()
    (if state (begin (state) (loop))
        val)))
=> 4

;; functional
(+* 1 1
    (lambda (ans)
      (** ans 2
          (lambda (x) x) ; do nothing, just return it
          )))
=> 4

下面是对 fib 函数的手动 CPS 变换,通过它我们可以看到函数中各表达式的求值顺序:

(setq lexical-binding t)
(defun fib (n)
  (cond
   ((= n 0) 0)
   ((= n 1) 1)
   (t
    (+ (fib (- n 1))
       (fib (- n 2))))))

(fib 30) => 832040

(defun =* (a b k)
  (funcall k (= a b)))
(defun +* (a b k)
  (funcall k (+ a b)))
(defun -* (a b k)
  (funcall k (- a b)))

(defun fib* (n k)
  (=* n 0
      (lambda (k1)
        (if k1 (funcall k 0)
          (=* n 1
              (lambda (k2)
                (if k2 (funcall k 1)
                  (-* n 1
                      (lambda (k3)
                        (fib* k3
                              (lambda (k4)
                                (-* n 2
                                    (lambda (k5)
                                      (fib* k5
                                            (lambda (k6)
                                              (+* k4 k6
                                                  (lambda (k7)
                                                    (funcall k k7))))))))))))))))))

(fib* 8 'identity) => 21

由于会爆栈,这个 CPS 变换后的 fib* 只能算到 fib(8) (elisp 真弱啊,不过改成命令式就没事了)。这个 fib* 并不完全等价于 fib ,在 fib 中我们并不知道 fib(n-1)fib(n-2) 的运算顺序,但是在 fib* 中我显式指定了前者要在后者之前。也就是说,通过 CPS 变换,我们可以 指定求值顺序

读者有兴趣的话可以在 JS 中试着写一个这样的 fib* 函数出来,这里我就不写了,不过我曾经在 ReScript 中试过一次:

1.png

其中, yeq 判断两数字是否相等, ysb 做减法, yad 做加法,它们都有 k 参数。

3.2. 对 lambda calculus 实现 CPS 变换

我的 CPS 变换启蒙应该是这一封公共邮件:Self-application as the fixpoint of call/cc,这里有一份整理后的 txt 文件,不过我已经忘了我从哪里拿到的了(笑)。很早之前我也看过这样一篇文章:CPS 变换与 CPS 变换编译,最近又看了 Matt Might 的 CPS 文章,对我来说,是时候解决掉这些还没有看完的东西了。接下来的内容我对上面的文章都有参考。

这是 lambda calculus 的定义:

<λexp> :: <var>
       | λ <var> . <λexp>
       | ( <λexp> . <λexp> )

CPS 变换可由如下公式给出( [[]] 表示进行 CPS 变换):

\[\begin{align*}[[x]] &= \lambda k.kx \\ [[\lambda x.M]] &= \lambda k.k(\lambda x. [[M]]) \\ [[M N]] &= \lambda k. [[M]] (\lambda m. [[N]] (\lambda n.(mn)k))\end{align*}\]

首先,对于简单的值,它的 CPS 变换结果就是在外面套一层 lambda ,比如对 a 的 CPS 变换就是 (lambda (k) (k a))

(CPS 'a) => (lambda (k) (k a))

对函数表达式,那自然也是外面包一层 lambda(k) ,但是我们不能仅对整个函数做变换,函数体也需要 CPS 变换,所以:

(CPS '(lambda (x) M))
=>
(lambda (k) (k (lambda (x) (CPS M))))

(lambda (x) x)
=>
(lambda (k1)
  (k1 (lambda (x)
       (lambda (k2)
         (k2 x)))))

最后是对函数应用的变换,由于这里所有函数都是单参的,所以我们只需要分别对函数和参数进行 CPS 变换即可,然后把变换后得到的东西组合起来,再用 lambda(k) 包起来:

(CPS '(x y))
=>
(lambda (k1)
  ((CPS x)
   (lambda (fun)
     ((CPS y)
      (lambda (arg)
        ((fun arg) k1))))))
=>
(lambda (k1)
  ((lambda (k2) (k2 x))
   (lambda (fun)
     ((lambda (k3) (k3 y))
      (lambda (arg)
        ((fun arg) k1))))))

这一段可能有些烧脑,你可能会想为什么结果不是 ((CPS x) (CPS y)) ,这是因为经过 CPS 变换后函数多了一个 k 参数,我们需要额外的结构对它进行处理。

现在我们就完成了对基本 lambda 演算表达式的 CPS 变换,我们可以用 racket 的模式匹配简单写个 CPS 变换函数:

(define (mycps ls)
  (match ls
         [(list a b)
          (define $f (gensym '$f))
          (define $e (gensym '$e))
          (define $k (gensym '$k))
          `(λ (,$k)
             (,(mycps a)
              (λ (,$f)
                (,(mycps b)
                 (λ (,$e)
                   ((,$f ,$e) ,$k))))))]
         [(list a b c)
          (define $k (gensym '$k))
          `(λ (,$k) (,$k (λ ,b ,(mycps c))))]
         [p
          (define $k (gensym '$k))
          `(λ (,$k) (,$k ,p))]))


(mycps 'x) =>
'(λ ($k1) ($k1 x))

(mycps '(λ (x) x)) =>
'(λ ($k1) ($k1 (λ (x) (λ ($k2) ($k2 x)))))

(mycps '(x y)) =>
'(λ ($k1)
   ((λ ($k2) ($k2 x))
    (λ ($f1) ((λ ($k3) ($k3 y))
              (λ ($e1) (($f1 $e1) $k1))))))

((λ (x) (x x)) (λ (x) 1)) => 1

(mycps '((λ (x) (x x)) (λ (x) 1))) =>
'(λ ($k1)
   ((λ ($k2)
      ($k2 (λ (x)
             (λ ($k3)
               ((λ ($k4) ($k4 x))
                (λ ($f2) ((λ ($k5) ($k5 x))
                          (λ ($e2) (($f2 $e2) $k3)))))))))
    (λ ($f1)
      ((λ ($k6)
         ($k6 (λ (x)
                (λ ($k7) ($k7 1)))))
       (λ ($e1) (($f1 $e1) $k1))))))

((λ ($k1)
   ((λ ($k2)
      ($k2 (λ (x)
             (λ ($k3)
               ((λ ($k4) ($k4 x))
                (λ ($f2) ((λ ($k5) ($k5 x))
                          (λ ($e2) (($f2 $e2) $k3)))))))))
    (λ ($f1)
      ((λ ($k6)
         ($k6 (λ (x)
                (λ ($k7) ($k7 1)))))
       (λ ($e1) (($f1 $e1) $k1))))))
 (λ (x) x))
=> 1

(这里对生成的符号进行了简化, gensym 生成的符号太长了)。

当然,使用 elisp 的 pcase 也能很容易地实现上面的代码,这里还是用 elisp 实现一下做个补充。由于 elisp 没有尾递归优化,生成的代码没什么实际意义,更何况这是生成的 scheme-style 代码:

(defun mycps2 (ls)
  (pcase ls
    (`(λ ,x ,y)
     (let ((k (gensym "$k")))
       `(λ (,k) (,k (λ ,x ,(mycps2 y))))))
    (`(,appop ,apped)
     (let ((k (gensym "$k"))
           (f (gensym "$f"))
           (e (gensym "$e")))
       `(λ (,k) (,(mycps2 appop)
                 (λ (,f)
                   (,(mycps2 apped)
                    (λ (,e)
                      ((,f ,e) ,k))))))))
    ((pred atom)
     (let ((k (gensym "$k")))
       `(λ (,k) (,k ,ls))))))

由于 gensym 不论是在 Racket 还是 elisp 中生成的符号可读性不是太好(序号太大了),这里我再补充两个辅助函数,分别用于 Racket 和 elisp,用来对每次变换生成可读性较好的符号:

(define (ease-gensym-gen)
  (let ((h (make-hash)))
    (λ (sym)
      (let* ((num (hash-ref h sym
                            (λ ()
                              (hash-set! h sym 0)
                              0)))
             (str (string-append (symbol->string sym)
                                 (number->string (+ num 1))))
             (s (string->uninterned-symbol str)))
        (hash-set! h sym (+ num 1))
        s))))

(define (mycps ls)
  (let ((gen (ease-gensym-gen)))
    (define (cps ls)
      (match ls
             [(list a b)
              (define $f (gen '$f))
              (define $e (gen '$e))
              (define $k (gen '$k))
              `(λ (,$k)
                 (,(cps a)
                  (λ (,$f)
                    (,(cps b)
                     (λ (,$e)
                       ((,$f ,$e) ,$k))))))]
             [(list a b c)
              (define $k (gen '$k))
              `(λ (,$k) (,$k (λ ,b ,(cps c))))]
             [p
              (define $k (gen '$k))
              `(λ (,$k) (,$k ,p))]))
    (cps ls)))

下面是 elisp 版本:

(defun ease-gensym-gen ()
  (let ((h (make-hash-table)))
    (lambda (sym)
      (let* ((num (gethash sym h 0))
             (str (concat (symbol-name sym)
                          (number-to-string (1+ num))))
             (s (make-symbol str)))
        (puthash sym (1+ num) h)
        s))))

(defun mycps2 (ls)
  (let ((gen (ease-gensym-gen)))
    (cl-labels
        ((cps (ls)
              (pcase ls
                (`(λ ,x ,y)
                 (let ((k (funcall gen '$k)))
                   `(λ (,k) (,k (λ ,x ,(cps y))))))
                (`(,appop ,apped)
                 (let ((k (funcall gen '$k))
                       (f (funcall gen '$f))
                       (e (funcall gen '$e)))
                   `(λ (,k) (,(cps appop)
                             (λ (,f)
                               (,(cps apped)
                                (λ (,e)
                                  ((,f ,e) ,k))))))))
                ((pred atom)
                 (let ((k (funcall gen '$k)))
                   `(λ (,k) (,k ,ls)))))))
      (cps ls))))

来个 Y combinator 的 CPS conversion:

(mycps '(λ (f)
          ((λ (x)
             (λ (n) ((f (x x)) n)))
           (λ (x)
             (λ (n) ((f (x x)) n))))))

'(λ ($k1)
   ($k1 (λ (f)
          (λ ($k2)
            ((λ ($k3)
               ($k3 (λ (x)
                      (λ ($k4)
                        ($k4 (λ (n)
                               (λ ($k5)
                                 ((λ ($k6)
                                    ((λ ($k7) ($k7 f))
                                     (λ ($f3)
                                       ((λ ($k8)
                                          ((λ ($k9) ($k9 x))
                                           (λ ($f4) ((λ ($k10) ($k10 x))
                                                     (λ ($e4) (($f4 $e4) $k8))))))
                                        (λ ($e3) (($f3 $e3) $k6))))))
                                  (λ ($f2) ((λ ($k11) ($k11 n))
                                            (λ ($e2) (($f2 $e2) $k5))))))))))))
             (λ ($f1)
               ((λ ($k12)
                  ($k12 (λ (x)
                          (λ ($k13)
                            ($k13 (λ (n)
                                    (λ ($k14)
                                      ((λ ($k15)
                                         ((λ ($k16) ($k16 f))
                                          (λ ($f6) ((λ ($k17)
                                                      ((λ ($k18) ($k18 x))
                                                       (λ ($f7) ((λ ($k19) ($k19 x))
                                                                 (λ ($e7) (($f7 $e7) $k17))))))
                                                    (λ ($e6) (($f6 $e6) $k15))))))
                                       (λ ($f5) ((λ ($k20) ($k20 n))
                                                 (λ ($e5) (($f5 $e5) $k14))))))))))))
                (λ ($e1) (($f1 $e1) $k2)))))))))

就这样,我们实现了最基本的 lambda 演算 CPS 变换,不过从结果来看比较复杂的表达式的可读性几乎没有,参考 Matt Might 的文章有一种更高明的方法,这里我对他的代码略作修改来得到如下的 elisp 代码,他喜欢把 continuation 放在第一参数位置,我比较习惯放到最后。以下代码是 Matt Might 代码的一种变体:

(defun mycps3 (expr k)
  (let ((gen (ease-gensym-gen)))
    (cl-labels
        ((T (expr k)
            (pcase expr
              ((pred atom) `(,k ,expr))
              (`(λ (,v) ,e)
               (let (($k (funcall gen '$k)))
                 `(,k (λ (,v) (λ (,$k) ,(T e $k))))))
              (`(,f ,e)
               (let (($f (funcall gen '$f))
                     ($e (funcall gen '$e)))
                 (T f `(λ (,$f)
                         ,(T e `(λ (,$e)
                                  ((,$f ,$e) ,k))))))))))
      (T expr k))))

之所以能这样做是基于以下事实的:(CSCI B552 Lecture 14

\[\begin{align*}[[x]]\ k &= kx\\ [[λx.e]]\ k &= k (λx.λk.[[e]]k) \\ [[e0 \ e1]]\ k &= [[e0]]\ (λf.[[e1]](λv.(f v) k)) \end{align*}\]

这里也放上最初的 CPS 变换方便对比:

\[\begin{align*}[[x]] &= \lambda k.kx \\ [[\lambda x.M]] &= \lambda k.k(\lambda x. [[M]]) \\ [[M N]] &= \lambda k. [[M]] (\lambda m. [[N]] (\lambda n.(mn)k))\end{align*}\]

注意上面的第二条规则,它对函数体 e 进行了 eta-conversion,也就是由 e 变成了 λx.ex ,很明显这样做是为了提取子结构 [[e]]k 方便进一步化简。

我们可以观察一下简化前后的效果:

(mycps2 '((λ (x) x) x))
(λ ($k1) ((λ ($k2) ($k2 (λ (x) (λ ($k3) ($k3 x)))))
          (λ ($f1) ((λ ($k4) ($k4 x)) (λ ($e1) (($f1 $e1) $k1))))))
(mycps3 '((λ (x) x) x) 'id)
((λ ($f1) ((λ ($e1) (($f1 $e1) id)) x))
 (λ (x) (λ ($k1) ($k1 x))))

(mycps2 '(λ (x) (x x)))
(λ ($k1)
  ($k1 (λ (x)
         (λ ($k2)
           ((λ ($k3) ($k3 x))
            (λ ($f1) ((λ ($k4) ($k4 x))
                      (λ ($e1) (($f1 $e1) $k2)))))))))
(mycps3 '(λ (x) (x x)) 'id)
(id
 (λ (x)
   (λ ($k1)
     ((λ ($f1)
        ((λ ($e1)
           (($f1 $e1) $k1))
         x))
      x))))

虽然现在生成的代码短了不少,但很明显还有优化的空间,比如上面的 (M '(λ (x) (x x))) 中间生成了很多不必要的调用,既然参数都是原子了那应该直接消掉。我们可以考虑使用 beta-reduction 化简,也就是在生成过程中进行函数调用来直接消除,Matt Might 的实现非常的妙,这里就直接给出微调过的代码了:

(defun mycps4 (expr k)
  (let ((gen (ease-gensym-gen)))
    (cl-labels
        ((T (expr k)
            (pcase expr
              ((pred atom) `(,k ,expr))
              (`(λ (,v) ,e)
               (let (($k (funcall gen '$k)))
                 `(,k (λ (,v) (λ (,$k) ,(T e $k))))))
              (`(,f ,e)
               (T-a f (lambda ($f)
                        (T-a e (lambda ($e)
                                 `((,$f ,$e) ,k))))))))
         (T-a (expr k)
              (pcase expr
                ((pred atom) (funcall k expr))
                (`(λ (,v) ,e)
                 (let (($k (funcall gen '$k)))
                   (funcall k `(λ (,v) (λ (,$k) ,(T e $k))))))
                (`(,f ,e)
                 (let* (($r (funcall gen '$r))
                        (kout `(λ (,$r) ,(funcall k $r))))
                   (T-a f (lambda ($f)
                            (T-a e (lambda ($e)
                                     `((,$f ,$e) ,kout))))))))))
      (T expr k))))

mycps3 不同的是, mycps4 对 apply 操作提供了一个辅助函数 T-a 用来消除多余的中间调用,它传递给 T-a 的第二参数不再是一个 list 而是可调用的 continuation。

现在再来看看对 ((λ (x) x) x) 的 CPS 变换:

(mycps4 '((λ (x) x) x) 'id)
(((λ (x) (λ ($k1) ($k1 x))) x) id)
(mycps3 '((λ (x) x) x) 'id)
((λ ($f1) ((λ ($e1) (($f1 $e1) id)) x)) (λ (x) (λ ($k1) ($k1 x))))

;; racket
(define id identity)
(define x 1)
(((λ (x) (λ ($k1) ($k1 x))) x) id) => 1
((λ ($f1) ((λ ($e1) (($f1 $e1) id)) x)) (λ (x) (λ ($k1) ($k1 x)))) => 1

以上我们就写出了一个还不错的 CPS 变换,我们以 Y combinator 的 CPS 变换结果作为这一小节的结尾吧:

;; Y combinator
(λ (f)
  ((λ (x) (λ (n) ((f (x x)) n)))
   (λ (x) (λ (n) ((f (x x)) n)))))

(mycps4 '(λ (f)
           ((λ (x)
              (λ (n) ((f (x x)) n)))
            (λ (x)
              (λ (n) ((f (x x)) n)))))
        'id)
(id (λ (f)
      (λ ($k1)
        (((λ (x) (λ ($k2)
                   ($k2 (λ (n) (λ ($k3) ((x x) (λ ($r2) ((f $r2) (λ ($r1) (($r1 n) $k3))))))))))
          (λ (x) (λ ($k4)
                   ($k4 (λ (n) (λ ($k5) ((x x) (λ ($r4) ((f $r4) (λ ($r3) (($r3 n) $k5)))))))))))
         $k1))))

我们可以使用 Y combinator 实现的阶乘函数检验上面得到的 Y 是否正确,下面是 racket 中实现的 CPS 变换代码:

;; racket

(define (atom? x)
  (and (not (null? x))
       (not (pair? x))))

(define (mycps-4 expr k)
  (let ((gen (ease-gensym-gen)))
    (letrec
        ([T (lambda (expr k)
              (match expr
                     [(? atom?) `(,k ,expr)]
                     [`(λ (,v) ,e)
                      (let (($k (gen '$k)))
                        `(,k (λ (,v) (λ (,$k) ,(T e $k)))))]
                     [`(,f ,e)
                      (T-a f (λ ($f)
                               (T-a e (λ ($e)
                                        `((,$f ,$e) ,k)))))]))]
         [T-a (lambda (expr k)
                (match expr
                       [(? atom?) (k expr)]
                       [`(λ (,v) ,e)
                        (let (($k (gen '$k)))
                          (k `(λ (,v) (λ (,$k) ,(T e $k)))))]
                       [`(,f ,e)
                        (let* (($r (gen '$r))
                               (kout `(λ (,$r) ,(k $r))))
                          (T-a f (λ ($f)
                                   (T-a e (λ ($e)
                                            `((,$f ,$e) ,kout))))))]))])
      (T expr k))))

这是 fact 的原型:

(define id identity)

(define fa♂
  (λ (f)
    (λ (k1)
      (k1 (λ (n)
            (λ (k2)
              (if (= n 0) (k2 1)
                  ((f (- n 1))
                   (λ (v) (k2 (* v n)))))))))))
;; test
(define f
  (λ (x)
    (λ (k2)
      (k2 x))))

((fa♂ f)
 (λ (fn)
   ((fn 10)
    id)))
=> 90

这是 Y combinator:

(define Y
  (λ (f)
    (λ ($k1)
      (((λ (x) (λ ($k2)
                 ($k2 (λ (n) (λ ($k3) ((x x)
                                       (λ ($r2) ((f $r2) (λ ($r1) (($r1 n) $k3))))))))))
        (λ (x) (λ ($k4)
                 ($k4 (λ (n) (λ ($k5) ((x x)
                                       (λ ($r4) ((f $r4) (λ ($r3) (($r3 n) $k5)))))))))))
       $k1))))

现在让我们算一算 (fact 10)

((Y fa♂)
 (lambda (f)
   ((f 10)
    (lambda (x) x))))
=> 3628800

3.3. 一把锋利的剪刀 – call/cc

如果你学过 Scheme 或读了上面我列出的文章,现在你应该对 call/cc 是什么有了一定的了解,偷懒起见这里我就不介绍 call/cc 了。如果你对 call/cc 不感兴趣可以跳过这一节,我之所以要写只是为了补完我的 Scheme 知识。

我将这一节的标题起为“一把锋利的剪刀”是因为我觉得 call/cc 的作用就是“剪断”代码:

(define (a c)
  (if (call/cc
       (lambda (k) (c k)))
      1
      2))

(a (λ (k) (k #t))) => 1
(a (λ (k) (k #f))) => 2

我们完全可以使用普通的 procedure 来实现上面的代码,就像这样:

(define (a c)
  (c (λ (k)
       (if k
           1
           2))))

(a (λ (k) (k #t))) => 1
(a (λ (k) (k #f))) => 2

call/cc 与 generator 有些类似,不过更强大,毕竟我们可以用 call/cc 来实现 generator,大约两年前我写过这样的代码:

使用 call/cc 实现 generator
;; function
(define (make-generator procedure)
  (define last-return values)
  (define last-value #f)
  (define (last-continuation _)
    (let ((result (procedure yield)))
      (last-return result)))

  (define (yield value)
    (call/cc (lambda (continuation)
               (set! last-continuation continuation)
               (set! last-value value)
               (last-return value))))

  (lambda args
    (call/cc (lambda (return)
               (set! last-return return)
               (if (null? args)
                   (last-continuation last-value)
                   (apply last-continuation args))))))

(define fib-gen
  (make-generator
   (lambda (collect)
     (let f ([a 0] [b 1])
       (collect a)
       (f b (+ a b))))))

;; macro r6rs
(define-syntax gen-gen2
  (lambda (x)
    (syntax-case x (lambda)
      [(k (lambda varlist e1 e2 ...))
       (with-syntax
        ((yed (datum->syntax #'k 'yield)))
        #`(lambda varlist
            (define store-return values)
            (define store-k (lambda x (begin e1 e2 ...)))
            (define-syntax yed
              (lambda (x)
                (syntax-case x ()
                  [(_ v1 v2 (... ...))
                   #'(call/cc
                      (lambda (k)
                        (set! store-k k)
                        (apply store-return v1 v2 (... ...) '())))])))
            (lambda resume-vals
              (call/cc (lambda (return)
                         (set! store-return return)
                         (apply store-k resume-vals))))))])))

(define gen-next (lambda (x . val) (apply x val)))

(define fib-gen2
  (gen-gen2 (lambda (x y)
              (let f ([a x] [b y])
                (yield a)
                (f b (+ a b))))))

(define fib-s (fib-gen 0 1))
(gen-next fib-s) ;call n times

当然这一节我们的重点不在于如何使用 call/cc,而是如何使用 CPS 实现 call/cc。实际上这是非常简单的,我们只需要这样就行了:

\[[[call/cc]] = λf.λk.f(λv.λk_0.kv)k\]

写成代码的话就是这样:

(CPS 'call/cc) =>
(λ (f) (λ (k) ((f (λ (v) (λ (k0) (k v)))) k)))

那么我们要怎样理解这个变换呢?call/cc 原先就接受一个过程,并将 current-continuation 作为该过程的参数,如果将它当作普通函数的话,它可能是这样的:

(define call/cc
  (lambda (f)
    (f *k*)))

(mycps4 '(λ (f) (f *k*)) 'id) =>
(id (λ (f) (λ ($k1) ((f *k*) $k1))))

(define call/cc*
  (λ (f)
    (λ (k)
      ((f *k*) k))))

我们对比一下 call/cc 的 CPS 变换和我们得到的 call/cc*,可以发现 *k* 对应于 (λ (v) (λ (k0) (k v))) ,如果正常求值的话 (λ (v) (λ (k) (k v))) 就是某一表达式的“单位” continuation,这一点很好理解,就比如我们对 1 求值:

1 =>
(((λ (k) (k 1))
  (λ (v) (λ (k) (k v))))
 id)

容易看出这个 (λ (v) (λ (k) (k v))) 中的 k 就是 current-continuation,而我们使用 call/cc 后,这个 current-continuation 要转移到 call/cc 中的 continuation 去,所以 call/cc 表达式中的 k0 被忽略了。

将上面的规则添加到 mycps 中(同时把显式的 continuation 改成隐式),我们可以得到:

(defun mycps5 (expr)
  (let ((gen (ease-gensym-gen)))
    (cl-labels
        ((T (expr k)
            (pcase expr
              ('call/cc
               (let (($f (funcall gen '$f))
                     ($k (funcall gen '$k))
                     ($v (funcall gen '$v)))
                 `(,k (λ (,$f) (λ (,$k) ((,$f (λ (,$v) (λ (_) (,$k ,$v)))) ,$k))))))
              ((pred atom) `(,k ,expr))
              (`(λ (,v) ,e)
               (let (($k (funcall gen '$k)))
                 `(,k (λ (,v) (λ (,$k) ,(T e $k))))))
              (`(,f ,e)
               (T-a f (lambda ($f)
                        (T-a e (lambda ($e)
                                 `((,$f ,$e) ,k))))))))
         (T-a (expr k)
              (pcase expr
                ('call/cc
                 (let (($f (funcall gen '$f))
                       ($k (funcall gen '$k))
                       ($v (funcall gen '$v)))
                   (funcall k `(λ (,$f) (λ (,$k) ((,$f (λ (,$v) (λ (_) (,$k ,$v)))) ,$k))))))
                ((pred atom) (funcall k expr))
                (`(λ (,v) ,e)
                 (let (($k (funcall gen '$k)))
                   (funcall k `(λ (,v) (λ (,$k) ,(T e $k))))))
                (`(,f ,e)
                 (let* (($r (funcall gen '$r))
                        (kout `(λ (,$r) ,(funcall k $r))))
                   (T-a f (lambda ($f)
                            (T-a e (lambda ($e)
                                     `((,$f ,$e) ,kout))))))))))
      `(λ (k) ,(T expr 'k)))))

我们可以用下面的例子验证一下正确性,由 elisp 生成供 racket 运行的代码(笑):

(mycps5 '(call/cc (λ (k) (k 1))))
(λ (k) (((λ ($f1) (λ ($k1) (($f1 (λ ($v1) (λ (_) ($k1 $v1)))) $k1)))
         (λ (k) (λ ($k2) ((k 1) $k2)))) k))

((λ (k) (((λ ($f1) (λ ($k1) (($f1 (λ ($v1) (λ (_) ($k1 $v1)))) $k1)))
          (λ (k) (λ ($k2) ((k 1) $k2)))) k))
 id)
=> 1

(mycps5 '((call/cc call/cc) (λ (x) 2)))
(λ (k) (((λ ($f1) (λ ($k2) (($f1 (λ ($v1) (λ (_) ($k2 $v1)))) $k2)))
         (λ ($f2) (λ ($k3) (($f2 (λ ($v2) (λ (_) ($k3 $v2)))) $k3))))
        (λ ($r1) (($r1 (λ (x) (λ ($k1) ($k1 2)))) k))))

((λ (k) (((λ ($f1) (λ ($k2) (($f1 (λ ($v1) (λ (_) ($k2 $v1)))) $k2)))
          (λ ($f2) (λ ($k3) (($f2 (λ ($v2) (λ (_) ($k3 $v2)))) $k3))))
         (λ ($r1) (($r1 (λ (x) (λ ($k1) ($k1 2)))) k))))
 id)
=> 2

(mycps5 '((call/cc call/cc) (call/cc call/cc)))
(λ (k) (((λ ($f3) (λ ($k3) (($f3 (λ ($v3) (λ (_) ($k3 $v3)))) $k3)))
         (λ ($f4) (λ ($k4) (($f4 (λ ($v4) (λ (_) ($k4 $v4)))) $k4))))
        (λ ($r1) (((λ ($f1) (λ ($k1) (($f1 (λ ($v1) (λ (_) ($k1 $v1)))) $k1)))
                   (λ ($f2) (λ ($k2) (($f2 (λ ($v2) (λ (_) ($k2 $v2)))) $k2))))
                  (λ ($r2) (($r1 $r2) k))))))

((λ (k) (((λ ($f3) (λ ($k3) (($f3 (λ ($v3) (λ (_) ($k3 $v3)))) $k3)))
          (λ ($f4) (λ ($k4) (($f4 (λ ($v4) (λ (_) ($k4 $v4)))) $k4))))
         (λ ($r1) (((λ ($f1) (λ ($k1) (($f1 (λ ($v1) (λ (_) ($k1 $v1)))) $k1)))
                    (λ ($f2) (λ ($k2) (($f2 (λ ($v2) (λ (_) ($k2 $v2)))) $k2))))
                   (λ ($r2) (($r1 $r2) k))))))
 id)
=> infinite loop...

如果再加上一些变量绑定结构,我们可以使用 call/cc 玩玩 yin-yang puzzle,这里我就不继续了。

3.4. CPS conversion – another approach

以上内容的数学公式和部分例子来自第二篇文章,CPS 变换的主体实现来自 Matt Might 的文章,完整起见这里贴一下实现,这里我就不分析了。

(define-syntax CPS
  (syntax-rules (lambda call/cc p)
    ((CPS (?e1 ?e2) . args)                     ; application
     (NORM (lambda (k) ((CPS ?e1) (lambda (f) ((CPS ?e2) (lambda (a)
                                                           ((f a) k)))))) . args))
    ((CPS (lambda (x) ?e) . args)               ; abstraction
     (NORM (lambda (k) (k (lambda (x) (CPS ?e)))) . args))
    ((CPS call/cc . args)
     (NORM (lambda (k0)
             (k0 (lambda (p) (lambda (k)
                               ((p (lambda (a) (lambda (k1) (k a)))) k)))))
           . args))
    ((CPS p . args)     ; skolem constant for *any* value
     (NORM (lambda (k) (k pv)) . args))
    ((CPS ?x . args)
     (NORM (lambda (k) (k ?x)) . args))))

(define-syntax NORM
  (syntax-rules (lambda CPS)
    ((NORM t) (NORM t () ()))
    ((NORM (CPS e) env stack) (CPS e env stack))
    ((NORM (lambda (x) e) env ())
     (let-syntax ((ren (syntax-rules ()
                         ((ren ?x ?e ?env) (lambda (x) (NORM ?e ((?x () x) . ?env) ()))))))
       (ren x e env)))
    ((NORM (lambda (x) b) env ((enve e) . stack))
     (NORM b ((x enve e) . env) stack))
    ((NORM (e1 e2) env stack)
     (NORM e1 env ((env e2) . stack)))
    ((NORM x () ()) x)
    ((NORM x () ((enve e) ...))
     (x (NORM e enve ()) ...))
    ((NORM x env stack)
     (let-syntax
         ((find
           (syntax-rules (x)
             ((find ?x ((x ?envs ?es) . _) ?stack) (NORM ?es ?envs ?stack))
             ((find ?x (_ . ?env) ?stack) (NORM ?x ?env ?stack)))))
       (find x env stack)))
    ))

3.5. 王垠的 40 行实现 CPS

可能很多同学和我一样是从 yin wang 那里知道 CPS 的,这里就着纪念的心态把他的代码搬过来,如果你对上面的内容没什么疑问的话,相信这些代码你也很容易看懂:

yin wang 40 lines
;; A simple CPS transformer which does proper tail-call and does not
;; duplicate contexts for if-expressions.

;; author: Yin Wang ([email protected])


(load "pmatch.scm")


(define cps
  (lambda (exp)
    (letrec
        ([trivial? (lambda (x) (memq x '(zero? add1 sub1)))]
         [id (lambda (v) v)]
         [ctx0 (lambda (v) `(k ,v))]      ; tail context
         [fv (let ([n -1])
               (lambda ()
                 (set! n (+ 1 n))
                 (string->symbol (string-append "v" (number->string n)))))]
         [cps1
          (lambda (exp ctx)
            (pmatch exp
              [,x (guard (not (pair? x))) (ctx x)]
              [(if ,test ,conseq ,alt)
               (cps1 test
                     (lambda (t)
                       (cond
                        [(memq ctx (list ctx0 id))
                         `(if ,t ,(cps1 conseq ctx) ,(cps1 alt ctx))]
                        [else
                         (let ([u (fv)])
                           `(let ([k (lambda (,u) ,(ctx u))])
                              (if ,t ,(cps1 conseq ctx0) ,(cps1 alt ctx0))))])))]
              [(lambda (,x) ,body)
               (ctx `(lambda (,x k) ,(cps1 body ctx0)))]
              [(,op ,a ,b)
               (cps1 a (lambda (v1)
                         (cps1 b (lambda (v2)
                                   (ctx `(,op ,v1 ,v2))))))]
              [(,rator ,rand)
               (cps1 rator
                     (lambda (r)
                       (cps1 rand
                             (lambda (d)
                               (cond
                                [(trivial? r) (ctx `(,r ,d))]
                                [(eq? ctx ctx0) `(,r ,d k)]  ; tail call
                                [else
                                 (let ([u (fv)])
                                   `(,r ,d (lambda (,u) ,(ctx u))))])))))]))])
      (cps1 exp id))))




;;; tests

;; var
(cps 'x)
(cps '(lambda (x) x))
(cps '(lambda (x) (x 1)))


;; no lambda (will generate identity functions to return to the toplevel)
(cps '(if (f x) a b))
(cps '(if x (f a) b))


;; if stand-alone (tail)
(cps '(lambda (x) (if (f x) a b)))


;; if inside if-test (non-tail)
(cps '(lambda (x) (if (if x (f a) b) c d)))


;; both branches are trivial, should do some more optimizations
(cps '(lambda (x) (if (if x (zero? a) b) c d)))


;; if inside if-branch (tail)
(cps '(lambda (x) (if t (if x (f a) b) c)))


;; if inside if-branch, but again inside another if-test (non-tail)
(cps '(lambda (x) (if (if t (if x (f a) b) c) e w)))


;; if as operand (non-tail)
(cps '(lambda (x) (h (if x (f a) b))))


;; if as operator (non-tail)
(cps '(lambda (x) ((if x (f g) h) c)))


;; why we need more than two names
(cps '(((f a) (g b)) ((f c) (g d))))



;; factorial
(define fact-cps
  (cps
   '(lambda (n)
      ((lambda (fact)
         ((fact fact) n))
       (lambda (fact)
         (lambda (n)
           (if (zero? n)
               1
               (* n ((fact fact) (sub1 n))))))))))

;; print out CPSed function
(pretty-print fact-cps)
;; =>
;; '(lambda (n k)
;;    ((lambda (fact k) (fact fact (lambda (v0) (v0 n k))))
;;     (lambda (fact k)
;;       (k
;;        (lambda (n k)
;;          (if (zero? n)
;;            (k 1)
;;            (fact
;;             fact
;;             (lambda (v1) (v1 (sub1 n) (lambda (v2) (k (* n v2))))))))))
;;     k))


((eval fact-cps) 5 (lambda (v) v))
;; => 120

这里顺带附上一张解释的图以及源链接:

2.jpg
Figure 1: https://www.zhihu.com/question/20822815/answer/23890076

我写过的最满意的一个程序,自动 CPS 变换,就是在 C311 产生的。在 C311 的作业里,Friedman 经常加入一些“智力题”(brain teaser),做出来了可以加分。因为我已经有一定基础,所以我有精力来做那些智力题。开头那些题还不是很难,直到开始学 CPS 的时候,出现了这么一道:“请写出一个叫 CPSer 的程序,它的作用是自动的把 Scheme 程序转换成 CPS 形式。”那次作业的其它题目都是要求“手动”把程序变成 CPS 形式,这道智力题却要求一个“自动”的——用一个程序来转换另一个程序。

我觉得很有意思。如果能写出一个自动的 CPS 转换程序,那我岂不是可以用它完成所有其它的题目了!所以我就开始捣鼓这个东西,最初的想法其实就是“模拟”一个手动转换的过程。然后我发现这真是个怪物,就那么几十行程序,不是这里不对劲,就是那里不对劲。这里按下去一个 bug,那里又冒出来一个,从来没见过这么麻烦的东西。我就跟它死磕了,废寝忘食几乎一星期。经常走进死胡同,就只有重新开始,不知道推翻重来了多少次。到快要交作业的时候,我把它给弄出来了。最后我用它生成了所有其它的答案,产生的 CPS 代码跟手工转换出来的看不出任何区别。当然我这次我又得了满分(因为每次都做智力题,我的分数总是在 100 以上)。

作业发下来那天下课后,我跟 Friedman 一起走回 Lindley Hall(IU 计算机系的楼)。半路上他问我:“这次的 brain teaser 做了没有。”我说:“做了。这是个好东西啊,帮我把其它作业都做出来了。”他有点吃惊,又有点将信将疑的样子:“你确信你做对了?”我说:“不确信它是完全正确,但是转换后的作业程序全都跟手工做的一样。”走回办公室之后,他给了我一篇 30 多页的论文 “Representing control: a study of the CPS transformation”,作者是他的好朋友 Olivier Danvy 和 Andrzej Filinski。然后我才了解到,这是这个方向最厉害的两个人。正是这篇论文,解决了这个悬而不决十多年的难题。其实自动的 CPS 转换,可以被用于实现高效的函数式语言编译器。Princeton 大学的著名教授 Andrew Appel 写了一本书叫《Compiling with Continuations》,就是专门讲这个问题的。Amr Sabry(我现在的导师)当年的博士论文就是一个比 CPS 还要简单的变换(叫做 ANF)。凭这个东西,他几乎灭掉了这整个 CPS 领域,并且拿到了终身教授职位。在他的论文发表 10 年之内也没有 CPS 的论文出现。

GTF - Great Teacher Friedman – yin wang

4. emacs generator 的实现

(写了这么多总算是到了本文原先认定的核心内容(笑))。

emacs 的 generator 实现位于 emacs-lisp/generator.el 中,总行数刚好 800,读下来应该不会太费劲。我们从最根本的 CPS 变换函数 cps--transform-1 开始说起。

4.1. 各种各样的 CPS 变换

能在 cps--transform-1 中用 300 行完成整个语言的 CPS 变换,这足以说明 elisp 是一门非常简单的语言(当然了,不考虑各种各样的宏)。在开始之前,我首先说明一下 cps--transform-1 的工作方式,以下简称 cps--transform-1CPS 。这个函数接受一个 form 和一个 continuation 参数 next-state

(defun cps--transform-1 (form next-state)
  (pcase form
    ...))

整个 CPS 是一个巨大的 pcase 结构,它通过模式匹配来处理各种各样的语言结构,总的来说有这些:

  • (and e1 ...)
  • (catch tag . body)
  • (cond test . rest)
  • (condition-case var bodyform . handlers)
  • (if cond then else)
  • (inline form ...)
  • (progn form ...)
  • (prog1 form ...)
  • (let binding . body)
  • (let* binding . body)
  • (or e1 ...)
  • (unwind-protect bodyform . unwindforms)
  • (while test . body)
  • (quote arg)
  • (function arg)
  • (iter-yield value)
  • 函数调用

generator.el 的开头定义了四个非常重要的变量: cps--bindingscps--statescps--value-symbolcps--state-symbolCPS 会使用它们存储变换过程中得到的代码,其中 cps--bindings 存放所有的对应于代码的变量, cps--states 是状态变量与代码的 alist, cps--value-symbol 为代表当前值的符号, cps--state-symbol 为代表当前状态的符号。它们还需要经过一些处理才能变成可用的 iterator,不过我在上面写过的一个小例子可以明了地说明这些变量的作用:

;; imperative
(let ((val '())
      (state '())
      (fun1 '()))
  (set! fun1 (lambda ()
               (set! val (* val 2))
               (set! state #f)))
  (set! state (lambda ()
                (set! val (+ 1 1))
                (set! state fun1)))
  (let loop ()
    (if state (begin (state) (loop))
        val)))

这里的 valstate 对应的就是 cps--value-symbolcps--state-symbolfun1 (也许还有其他函数)的生成要用到 cps--bindingscps--states

CPS 中的 pcase 所有分支的最前面有一个检测表达式是否含有 yield 的分支,如果不含就不对整个表达式进行 CPS 变换,这样可以提高代码的执行效率,毕竟状态切换是需要时间的,尤其是 elisp 中的 iterator 实现用到了“昂贵的”函数调用。

((guard (cps--atomic-p form))
 (cps--make-atomic-state form next-state))

这里的 cps--make-atomic-state 会生成求值和状态转换表达式,并将这段代码通过 cps-add-state 添加到 cps--bindingscps-states 中:

(defun cps--make-atomic-state (form next-state)
  (let ((tform `(prog1 ,form (setf ,cps--state-symbol ,next-state))))
    (cl-loop for wrapper in cps--dynamic-wrappers
             do (setf tform (funcall wrapper tform)))
    ;; Bind cps--cleanup-function to nil here because the wrapper
    ;; function mechanism is responsible for cleanup here, not the
    ;; generic cleanup mechanism.  If we didn't make this binding,
    ;; we'd run cleanup handlers twice on anything that made it out
    ;; to toplevel.
    (let ((cps--cleanup-function nil))
      (cps--add-state "atom"
        `(setf ,cps--value-symbol ,tform)))))

(defun cps--add-state (kind body)
  "Create a new CPS state of KIND with BODY and return the state's name."
  (declare (indent 1))
  (let ((state (cps--gensym "cps-state-%s-" kind)))
    (push (list state body cps--cleanup-function) cps--states)
    (push state cps--bindings)
    state))

注意到上面生成的的代码求值,状态转移和保存表达式的值的代码了吗:

(let ((tform `(prog1 ,form (setf ,cps--state-symbol ,next-state))))
  ...)

(cps--add-state "atom"
  `(setf ,cps--value-symbol ,tform))

(let ((state (cps--gensym "cps-state-%s-" kind)))
  (push (list state body cps--cleanup-function) cps--states)
  (push state cps--bindings)
  state))

cps--make-atomic-state 中的 wrapper 我们不用太关心,默认只有 identity ,也就是什么也不做。这里的 tform 就是经过 CPS 变换的表达式,比如原先为 1,那么现在就是 (prog1 1 (setf cps--state-symbol next-state)) ,此处的 cps--state-symbol 是在一般使用中(指 iter-defuniter-lambda )通过 gensym 生成的 Symbol, next-state 就是表示 continuation 的符号。在 cps--make-atomic-state 的最后调用了 cps--add-statetfrom 添加到状态存储器中。

如果我们让 cps--atomic-p 总为假的话, cps--transform-1 会对所有(即使不含 yield )表达式进行 CPS 变换,在这一节中我通过设置 cps-inhibit-atomic-optimizationt 来禁止非 yield 优化,以此展示一些表达式经过 CPS 后的样子。

为了方便展示,这里我们定义一个宏来展示 CPS 得到的代码,因为真正重要的代码只在 cps--states 中,这里只使用 pp-to-string 打印它:

(defmacro yy-with-vals (body)
  `(let ((cps--value-symbol (make-symbol "value-symbol"))
         (cps--state-symbol (make-symbol "state-symbol"))
         (cps--states nil)
         (cps--bindings nil))
     (cps--transform-1 ',body 'next)
     (pp-to-string cps--states)))

(yy-with-vals 1)
"((cps-state-atom-15771
  (setf value-symbol
        (prog1 1
          (setf state-symbol next)))
  nil))
"

接着,通过 (setq cps-inhibit-atomic-optimization nil) ,我们可以开始观察了。

4.1.1. and

and 这个短路逻辑的处理分为三个 pcase 子句:

('(and)                             ; (and) -> t
 (cps--transform-1 t next-state))
(`(and ,condition)                  ; (and CONDITION) -> CONDITION
  (cps--transform-1 condition next-state))
(`(and ,condition . ,rest)
  ;; Evaluate CONDITION; if it's true, go on to evaluate the rest
  ;; of the `and'.
  (cps--transform-1
   condition
   (cps--add-state "and"
     `(setf ,cps--state-symbol
            (if ,cps--value-symbol
                ,(cps--transform-1 `(and ,@rest)
                                   next-state)
              ,next-state)))))

我们可以使用以下代码观察一下输出结果:

(yy-with-vals (and))
"((cps-state-atom-15772
  (setf value-symbol
        (prog1 t
          (setf state-symbol next)))
  nil))
"
(yy-with-vals (and 1))
"((cps-state-atom-15773
  (setf value-symbol
        (prog1 1
          (setf state-symbol next)))
  nil))
"
(yy-with-vals (and 1 2))
"((cps-state-atom-15770
  (setf value-symbol
        (prog1 1
          (setf state-symbol cps-state-and-15769)))
  nil)
 (cps-state-and-15769
  (setf state-symbol
        (if value-symbol cps-state-atom-15768 next))
  nil)
 (cps-state-atom-15768
  (setf value-symbol
        (prog1 2
          (setf state-symbol next)))
  nil))
"

可见 cps--transform-1 确实把 (and 1 2) 切成了三段(笑)。比较有意思的是第二段 cps-state-and-15769 ,它通过上一段的值来决定下一个 continuation。

4.1.2. or

and 类似, or 同样是三个子句。它和 and 非常相似,只是对无参和多参的处理有些细微的不同罢了,这里就不展示效果了。

('(or) (cps--transform-1 nil next-state))
(`(or ,condition) (cps--transform-1 condition next-state))
(`(or ,condition . ,rest)
  (cps--transform-1
   condition
   (cps--add-state "or"
     `(setf ,cps--state-symbol
            (if ,cps--value-symbol
                ,next-state
              ,(cps--transform-1
                `(or ,@rest) next-state))))))

4.1.3. if

if 的 CPS 变换与 andor 的多参形式非常接近:

(`(if ,cond ,then . ,else)
  (cps--transform-1 cond
                    (cps--add-state "if"
                      `(setf ,cps--state-symbol
                             (if ,cps--value-symbol
                                 ,(cps--transform-1 then
                                                    next-state)
                               ,(cps--transform-1 `(progn ,@else)
                                                  next-state))))))

这是个简单的例子:

(yy-with-vals (if (hello) (world) (no)))
"((cps-state-atom-15777
  (setf value-symbol
        (prog1
            (hello)
          (setf state-symbol cps-state-if-15776)))
  nil)
 (cps-state-if-15776
  (setf state-symbol
        (if value-symbol cps-state-atom-15774 cps-state-atom-15775))
  nil)
 (cps-state-atom-15775
  (setf value-symbol
        (prog1
            (no)
          (setf state-symbol next)))
  nil)
 (cps-state-atom-15774
  (setf value-symbol
        (prog1
            (world)
          (setf state-symbol next)))
  nil))
"

简单的 if 表达式生成了 4 段代码,可见第二段决定了第三段和第四段哪一段执行。

4.1.4. cond

cond 的实现中使用了 or ,这也是理所应当的:

('(cond)                            ; (cond) -> nil
 (cps--transform-1 nil next-state))
(`(cond (,condition) . ,rest)
 (cps--transform-1 `(or ,condition (cond ,@rest))
                   next-state))
(`(cond (,condition . ,body) . ,rest)
 (cps--transform-1 `(if ,condition
                        (progn ,@body)
                      (cond ,@rest))
                   next-state))

展开式没什么好说的。

4.1.5. progn

更没什么好说的了,就是把各部分拆开。

('(progn) (cps--transform-1 nil next-state))
(`(progn ,form) (cps--transform-1 form next-state))
(`(progn ,form . ,rest)
  (cps--transform-1 form
                    (cps--transform-1 `(progn ,@rest)
                                      next-state)))

4.1.6. let*

由于代码都给切成一段一段了,所以各种嵌套的 letlet* 表达式都应该放在 body 的最前面,同时要做好重命名避免冲突,以下是 let* 实现代码:

(`(let* () . ,body)
 (cps--transform-1 `(progn ,@body) next-state))

(`(let* (,binding . ,more-bindings) . ,body)
 (let* ((var (if (symbolp binding) binding (car binding)))
        (value-form (car (cdr-safe binding)))
        (new-var (cps--add-binding var)))

   (cps--transform-1
    value-form
    (cps--add-state "let*"
      `(setf ,new-var ,cps--value-symbol
             ,cps--state-symbol
             ,(if (or (not lexical-binding) (special-variable-p var))
                  (cps--with-dynamic-binding var new-var
                    (cps--transform-1
                     `(let* ,more-bindings ,@body)
                     next-state))
                (cps--transform-1
                 (cps--replace-variable-references
                  var new-var
                  `(let* ,more-bindings ,@body))
                 next-state)))))))

由于是个递归实现,我们只需要分析中间步骤就好。这里的 new-var 就是生成了新的符号,并添加到了 cps--bindings 中:

(defun cps--add-binding (original-name)
  (car (push (cps--gensym (format "cps-binding-%s-" original-name))
             cps--bindings)))

在绑定好 new-var (指上面的 setf )后,代码会根据变量是否动态绑定来确定使用哪一种绑定,这里就体现出动态绑定和静态绑定的区别了。如果是动态绑定, cps--transform-1 会调用 cps--with-dynamic-binding 处理动态绑定变量,如果是静态绑定就对代码中的对应变量进行替换使用新的名字。这里我就不分析实现细节了,直接上例子:

(yy-with-vals (let* ((a 1)) a))
"((cps-state-atom-15781
  (setf value-symbol
        (prog1 1
          (setf state-symbol cps-state-let*-15780)))
  nil)
 (cps-state-let*-15780
  (setf cps-binding-a-15778 value-symbol state-symbol cps-state-atom-15779)
  nil)
 (cps-state-atom-15779
  (setf value-symbol
        (prog1 cps-binding-a-15778
          (setf state-symbol next)))
  nil))
"

4.1.7. let

let 的实现直接使用了 let* ,不过添加了一层包装:

(`(let ,bindings . ,body)
 (let* ((bindings (cl-loop for binding in bindings
                           collect (if (symbolp binding)
                                       (list binding nil)
                                     binding)))
        (temps (cl-loop for (var _value-form) in bindings
                        collect (cps--add-binding var))))
   (cps--transform-1
    `(let* ,(append
             (cl-loop for (_var value-form) in bindings
                      for temp in temps
                      collect (list temp value-form))
             (cl-loop for (var _binding) in bindings
                      for temp in temps
                      collect (list var temp)))
       ,@body)
    next-state)))

既然 let* 就是按顺序的 let ,那为什么不直接使用 let* 而是要加一层呢?这应该是出于调试方便考虑的,先使用临时变量对 let binding 中的各项逐项求值,这样在 binding 值用到不该出现的 let 变量时就会报错。不过这样一来单个 let 表达式就要用到两倍的变量 binding,无伤大雅就是了。

(yy-with-vals (let ((a 1)) a))
"((cps-state-atom-15793
  (setf value-symbol
        (prog1 1
          (setf state-symbol cps-state-let*-15792)))
  nil)
 (cps-state-let*-15792
  (setf cps-binding-cps-binding-a-15786-15787 value-symbol state-symbol cps-state-atom-15791)
  nil)
 (cps-state-atom-15791
  (setf value-symbol
        (prog1 cps-binding-cps-binding-a-15786-15787
          (setf state-symbol cps-state-let*-15790)))
  nil)
 (cps-state-let*-15790
  (setf cps-binding-a-15788 value-symbol state-symbol cps-state-atom-15789)
  nil)
 (cps-state-atom-15789
  (setf value-symbol
        (prog1 cps-binding-a-15788
          (setf state-symbol next)))
  nil))
"

现在有 5 段代码了,而且中间可以清晰地看到 cps-binding-cps-binding ,这也就说明依照临时变量的名字再次生成了符号。

4.1.8. while

while 的 CPS 变换直接消除了 while 表达式,将迭代变成了对某个 thunk 的反复调用,这实现还挺妙的:

(`(while ,test . ,body)
      ;; Open-code state addition instead of using cps--add-state: we
      ;; need our states to be self-referential. (That's what makes the
      ;; state a loop.)
      (let* ((loop-state
                (cps--gensym "cps-state-while-"))
             (eval-loop-condition-state
              (cps--transform-1 test loop-state))
             (loop-state-body
                `(progn
                   (setf ,cps--state-symbol
                         (if ,cps--value-symbol
                             ,(cps--transform-1
                               `(progn ,@body)
                               eval-loop-condition-state)
                           ,next-state)))))
        (push (list loop-state loop-state-body cps--cleanup-function)
              cps--states)
        (push loop-state cps--bindings)
        eval-loop-condition-state))

我们用一个非常简单的死循环来看看它的展开式:

(yy-with-vals (while t 1))
"((cps-state-while-15794
  (progn
    (setf state-symbol
          (if value-symbol cps-state-atom-15796 next)))
  nil)
 (cps-state-atom-15796
  (setf value-symbol
        (prog1 1
          (setf state-symbol cps-state-atom-15795)))
  nil)
 (cps-state-atom-15795
  (setf value-symbol
        (prog1 t
          (setf state-symbol cps-state-while-15794)))
  nil))
"

因为实现原因这里得到的列表序号就不是单调的了,不过这无关紧要,我们可以看到 15794 决定了循环是否能继续进行,以及 15796 的 continuation 是 15795 ,即计算条件表达式。

4.1.9. quote

这一段十分简单,我只列代码:

(`(quote ,arg) (cps--add-state "quote"
                 `(setf ,cps--value-symbol (quote ,arg)
                        ,cps--state-symbol ,next-state)))
(`(function ,arg) (cps--add-state "function"
                    `(setf ,cps--value-symbol (function ,arg)
                           ,cps--state-symbol ,next-state)))

以上列出的变换并不是 cps--transform-1 的全部,还有一些比较麻烦的控制结构( catch, unwind-protect, condition-case )我没有讲解,读者有兴趣可以自己去看看。

4.2. 构建 generator

我们已经明白了上面四个变量的作用,以及它们的值是如何通过调用 CPS 生成的,现在让我们看看 cps-generate-evaluator 这个函数,不论是 iter-lambdaiter-make 还是 iter-defun ,这些创建 generator/iterator 的宏在内部都对 body 调用了它来生成可用的代码。

首先 cps-generate-evaluator 会为一些符号创建 binding,并调用 cps--transform-1 来获取经过 CPS 变换的代码,然后将它的返回值(第一个状态的符号)绑定到变量 initial-state 上:

(let* (cps--states
       cps--bindings
       cps--cleanup-function
       (cps--value-symbol (cps--gensym "cps-current-value-"))
       (cps--state-symbol (cps--gensym "cps-current-state-"))
       ;; We make *cps-cleanup-table-symbol** non-nil when we notice
       ;; that we have cleanup processing to perform.
       (cps--cleanup-table-symbol nil)
       (terminal-state (cps--add-state "terminal"
                         `(signal 'iter-end-of-sequence
                                  ,cps--value-symbol)))
       (initial-state (cps--transform-1
                       (macroexpand-all
                        `(cl-macrolet
                             ((iter-yield (value)
                                          `(cps-internal-yield ,value)))
                           ,@body)
                        macroexpand-all-environment)
                       terminal-state))
       ...)
  ...)

接着,该函数根据 cps--bindings 中的状态名和其他符号创建 let 结构,这里面几乎包含了 iterator 的全部状态变量;在 binding 创建完成后,该函数将各个状态的代码通过无参 lambda 包装成一个个 thunk ,表示这些代码在当前环境中求值;最后它将 cps--state-symbol 设置为最初状态,这也是 CPS 的返回值:

`(let ,(append (list cps--state-symbol cps--value-symbol)
               (when cps--cleanup-table-symbol
                 (list cps--cleanup-table-symbol))
               (when finalizer-symbol
                 (list finalizer-symbol))
               (nreverse cps--bindings))
   ;; Order state list so that cleanup states are always defined
   ;; before they're referenced.
   ,@(cl-loop for (state body cleanup) in (nreverse cps--states)
              collect `(setf ,state (lambda () ,body))
              when cleanup
              do (cl-assert cps--cleanup-table-symbol)
              and collect `(push (cons ,state ,cleanup) ,cps--cleanup-table-symbol))
   (setf ,cps--state-symbol ,initial-state)
   ...)

随后 cps-generate-evaluator 会根据这段变换后的代码生成一个 interator:

(let ((iterator
       (lambda (op value)
         (cond
          ,@(when finalizer-symbol
              `(((eq op :stash-finalizer)
                 (setf ,finalizer-symbol value))
                ((eq op :get-finalizer)
                 ,finalizer-symbol)))
          ((eq op :close)
           ,(cps--make-close-iterator-form terminal-state))
          ((eq op :next)
           (setf ,cps--value-symbol value)
           (let ((yielded nil))
             (unwind-protect
                 (prog1
                     (catch 'cps--yield
                       (while t
                         (funcall ,cps--state-symbol)))
                   (setf yielded t))
               (unless yielded
                 ;; If we're exiting non-locally (error, quit,
                 ;; etc.)  close the iterator.
                 ,(cps--make-close-iterator-form terminal-state)))))
          (t (error "Unknown iterator operation %S" op))))))
  ;; let's body form
  )

注意到 iterator 接受两个参数,分别是 opvalueop 表示 iterator 接受的操作类型, value 则是 caller 向 iterator 传递的值,通过 iter-next 的代码我们可知 next 操作就是传递 :next 符号:

(defun iter-next (iterator &optional yield-result)
  "Extract a value from an iterator.
YIELD-RESULT becomes the return value of `iter-yield' in the
context of the generator.

This routine raises the `iter-end-of-sequence' condition if the
iterator cannot supply more values."
  (funcall iterator :next yield-result))

注意 iterator 的 op:next 的部分:

((eq op :next)
 (setf ,cps--value-symbol value)
 (let ((yielded nil))
   (unwind-protect
       (prog1
           (catch 'cps--yield
             (while t
               (funcall ,cps--state-symbol)))
         (setf yielded t))
     (unless yielded
       ;; If we're exiting non-locally (error, quit,
       ;; etc.)  close the iterator.
       ,(cps--make-close-iterator-form terminal-state)))))

根据这里来看,generator 通过 throw 非局部跳出到 cps--yield 实现了 yield 语义。这也就是说原先 body 中的 yield 部分代码应该被转换成了带有 throw 的结构。以下代码是 CPS 中的 yield 分支对应代码:

(`(cps-internal-yield ,value)
 (cps--transform-1
  value
  (cps--add-state "iter-yield"
    `(progn
       (setf ,cps--state-symbol
             ,(if cps--cleanup-function
                  (cps--add-state "after-yield"
                    `(setf ,cps--state-symbol ,next-state))
                next-state))
       (throw 'cps--yield ,cps--value-symbol)))))

4.3. 一些高级结构

iter-do 本质上就是个循环,只不过外面套了一层 condition-case 来捕获 iter-end-of-sequence signal:

(while
    (let ((,var
           (condition-case ,condition-symbol
               (iter-next ,it-symbol)
             (iter-end-of-sequence
              (setf ,result-symbol (cdr ,condition-symbol))
              (setf ,done-symbol t)))))
      (unless ,done-symbol
        ,@body
        ;; Loop until done-symbol is set.
        t)))

从代码内容来看,generator.el 还提供了一些文档中没有记录的函数和宏,我们可以使用 iter-make 直接创建 iterator 而不用先调用一次 generator,generator 还提供了一个空迭代器 iter-empty

对 generator 实现的分析到这里就结束了,分析的内容比较简单,没有涉及到清理相关的代码。希望这一节能帮助读者更好地理解 generator 是怎么实现的。

5. 后记

这文章写得老费劲了,为了用好 CPS 变换后的 Y combinator 我调了一下午的代码,最后发现某个嵌套层次出了问题,这里不得不感谢梨梨喵CPS 变换与 CPS 变换编译,其中的例子给了我很大的启发。

自从我了解到 CPS 变换到现在大概有个两三年了,通过阅读 emacs 的 generator 实现我也总算是能够写个玩具 CPS 变换出来了,也算是了却了一桩心愿(笑)。

Thanks for reading~