Skip to main content

Python3.11 介绍

wKevin

Python 3.11 于 2022.10.24 发布正式版 3.11.0,历经 1 年开发和测试:

  • 2021.10 - 2022.04:发布 7 个 a 版本;
  • 2022.05 - 2022.07:发布 5 个 b 版本;
  • 2022.8.8:rc1
  • 2022.9.12:rc2
  • 2022.10.24:3.11.0

重点新特性:

  • Runtime、解释器
    • Faster Cpython
    • bytecode 中添加偏移量对应关系,以便 traceback 中指明出错位置
  • 语言(语法、词法)
    • 新增“异常组”:ExceptionGroup & try...except*
  • 标准库
    • 新增 tomllib 模块
    • asyncio 模块
      • 新增“任务组”:asyncio.TaskGroup
      • Task 新增 cancelling(), uncancel() 方法
    • inspect 模块
      • 新增 getmembers_static()
    • dataclasses 模块
    • typing 模块
      • 增加 typing.TypeVarTuple
      • 增加 typing.Self
      • 增加 typing.LiteralString
      • 增加 typing.TypedDict

本文来导读一下这个刚刚正式揭开面纱的新版本。

Refer

本文内容大多源自下面文档,及其文档中的相关链接文档。

  1. Python 3.11.0 Release: 2022.10.24
  2. What’s New In Python 3.11Changelog
  3. PEP 664 - 3.11 Release Schedule

Runtime、解释器

Faster CPython

2020 年底一位普通 python 开发者 Mark 提出了此计划,2021 年 5 月得到 python 之父 Guio 的加入,微软投资成立了一个专门小组……小组成功落实到 python3.11 中,Mark 也被微软聘用了。

此计划的主要工作内容有:

  • PEP659:python 解释器用专业化(spectifaction)指令自适应替换原(高频)指令。
  • 运行时更快的加载。
  • 减小函数调用的开销:堆栈帧(frame)使用更少的内存和更快的设计。
  • “零开销”异常处理。

号称:Python 3.11 比 Python 3.10 快 10-60%,平均 25%。3.11.0 只是一个开始,后续版本会持续发力。

PEP 659 -- Draft

为“专业化”指令开发“自适应”形式,称之为:自适应指令。一旦代码中指令执行了足够多的次数,该指令就会被“专业化”,即替换为更快的、新的、自适应指令。—— 此过程称之为“加速(Quickening)”。

与现有的 bytecode 相比,quickening 的优点:

  • 可以运行时更改:指令替换是运行时自动发生的
  • 可以开发出 super-instructions:跨多行、多操作数
  • 不需要处理 tracing,因为它可以退回到 bytecode 执行 tracing

eg: LOAD_ATTR 替换为 LOAD_ATTR_ADAPTIVE

加载过程加快

Python 将 bytecode 缓存在 __pycache__ 目录下,,以加速模块加载。

  • 3.10:
    Read __pycache__ -> Unmarshal -> Heap allocated code object -> Evaluate
  • 3.11: core 模块被 frozen(code object & bytecode 被静态缓存)
    Statically allocated code object -> Evaluate

实测:

(venv310) $ time python -c "pass"
python -c "pass" 0.01s user 0.00s system 98% cpu 0.019 total
(venv310) $ time python -c "import os"
python -c "import os" 0.02s user 0.01s system 98% cpu 0.028 total
(venv311) $ time python -c "pass"
python -c "pass" 0.02s user 0.00s system 98% cpu 0.017 total
(venv311) $ time python -c "import os"
python -c "import os" 0.01s user 0.01s system 97% cpu 0.022 total

一点点提升,不是很明显,多次执行还有反复。

运行速度加快

函数调用时 cpython 会创建 frame 保存运行信息,frame 在 3.11 中的改变:

  • Streamlined the frame creation process.
  • Avoided memory allocation by generously re-using frame space on the C stack.
  • Streamlined the internal frame struct to contain only essential information. Frames previously held extra debugging and memory management information.

PEP 657 -- traceback 中包含细粒度错误信息

之前的编译保留了字节码到行号的映射关系,但无法映射到哪个表达式,python11 中添加了行号和偏移量(新增 co_positions 属性)用以在 traceback 中提供更方便的定位信息。

x = {}
x["a"] = 0
x["a"]["b"]["c"] = 1
  • py10
    $ python310 main.py
    Traceback (most recent call last):
    File "/home/me/py311/main.py", line 3, in <module>
    x["a"]["b"]["c"] = 1
    TypeError: 'int' object is not subscriptable
  • py311
    $ python311 main.py
    Traceback (most recent call last):
    File "/home/me/py311/main.py", line 3, in <module>
    x["a"]["b"]["c"] = 1
    ~~~~~~^^^^^
    TypeError: 'int' object is not subscriptable
    ~~~~~~^^^^^ 为新增提示。

函数调用链中也能突出显示:

def foo():
x = {}
x["a"] = 0
x["a"]["b"]["c"] = 1


def bar():
print("foo: %s" % foo())

bar()
  • py10
    Traceback (most recent call last):
    File "/home/me/py311/main.py", line 11, in <module>
    bar()
    File "/home/me/py311/main.py", line 8, in bar
    print("foo: %s" % foo())
    File "/home/me/py311/main.py", line 4, in foo
    x["a"]["b"]["c"] = 1
    TypeError: 'int' object is not subscriptable
  • py311
    Traceback (most recent call last):
    File "/home/me/py311/main.py", line 11, in <module>
    bar()
    File "/home/me/py311/main.py", line 8, in bar
    print("foo: %s" % foo())
    ^^^^^
    File "/home/me/py311/main.py", line 4, in foo
    x["a"]["b"]["c"] = 1
    ~~~~~~^^^^^
    TypeError: 'int' object is not subscriptable
    foo()["b"] 下面都给出了提示。

语言(语法、词法)

PEP 654 -- Exception Groups and except*

痛点

  • 异步库并发多任务引发的连锁异常没有办法整合、汇聚、关联。
  • 相同操作多次尝试(如 socket.create_connection 多次连接都失败才停止)中的多个异常如何整合?
  • 多个回调函数中若有异常的策略:如 atexit.registry() 允许用户注册多个退出回调函数,退出时依次运行,但若其中多个抛了异常,则用户只会拿到最后 1 个。很多 3rd 库也是这个策略,如 Pytest 在 teardown 中注册 finalizers。
  • 复杂计算中的异常通常难以定位。
  • 装饰器(wrapper)中的异常会屏蔽掉被封装体的信息。

解决方案

3.11 新增了 ExceptionGroup & BaseExceptionGroup

exception ExceptionGroup(msg, excs)
exception BaseExceptionGroup(msg, excs)

他俩在整个标准库内建异常中的继承关系是:

BaseException
├── BaseExceptionGroup
├── GeneratorExit
├── KeyboardInterrupt
├── SystemExit
└── Exception
├── ArithmeticError
│ ├── FloatingPointError
│ ├── OverflowError
│ └── ZeroDivisionError
├── AssertionError
├── AttributeError
├── BufferError
├── EOFError
├── ExceptionGroup [BaseExceptionGroup]
├── ......

创建

import traceback
try:
raise ExceptionGroup(
"one",
[
TypeError(1),
ExceptionGroup("two", [TypeError(2), ValueError(3)]),
ExceptionGroup("three", [OSError(4)]),
],
)
except ExceptionGroup as eg:
print(eg) # output: one (3 sub-exceptions)
print(eg.message) # output: one
traceback.print_exception(eg)
# output:
# + Exception Group Traceback (most recent call last):
# | File "/home/me/py311/main.py", line 20, in <module>
# | raise ExceptionGroup(
# | ExceptionGroup: one (3 sub-exceptions)
# +-+---------------- 1 ----------------
# | TypeError: 1
# +---------------- 2 ----------------
# | ExceptionGroup: two (2 sub-exceptions)
# +-+---------------- 1 ----------------
# | TypeError: 2
# +---------------- 2 ----------------
# | ValueError: 3
# +------------------------------------
# +---------------- 3 ----------------
# | ExceptionGroup: three (1 sub-exception)
# +-+---------------- 1 ----------------
# | OSError: 4
# +------------------------------------

提取子集:subgroup

subeg = eg.subgroup(lambda e: isinstance(e, TypeError))
traceback.print_exception(subeg)
# output:
# + Exception Group Traceback (most recent call last):
# | File "/home/me/py311/main.py", line 20, in <module>
# | raise ExceptionGroup(
# | ExceptionGroup: one (2 sub-exceptions)
# +-+---------------- 1 ----------------
# | TypeError: 1
# +---------------- 2 ----------------
# | ExceptionGroup: two (1 sub-exception)
# +-+---------------- 1 ----------------
# | TypeError: 2
# +------------------------------------

或者直接写类型

subeg = eg.subgroup(ValueError)
traceback.print_exception(subeg)
# output:
# + Exception Group Traceback (most recent call last):
# | File "/home/me/py311/main.py", line 20, in <module>
# | raise ExceptionGroup(
# | ExceptionGroup: one (1 sub-exception)
# +-+---------------- 1 ----------------
# | ExceptionGroup: two (1 sub-exception)
# +-+---------------- 1 ----------------
# | ValueError: 3
# +------------------------------------

还可以传入元祖集合

subeg = eg.subgroup((OSError, ValueError))
traceback.print_exception(subeg)
# output:
# + Exception Group Traceback (most recent call last):
# | File "/home/me/py311/main.py", line 20, in <module>
# | raise ExceptionGroup(
# | ExceptionGroup: one (2 sub-exceptions)
# +-+---------------- 1 ----------------
# | ExceptionGroup: two (1 sub-exception)
# +-+---------------- 1 ----------------
# | ValueError: 3
# +------------------------------------
# +---------------- 2 ----------------
# | ExceptionGroup: three (1 sub-exception)
# +-+---------------- 1 ----------------
# | OSError: 4
# +------------------------------------

拆分成 2 个子集:split

subeg1, subeg2 = eg.split(lambda e: isinstance(e, TypeError))
traceback.print_exception(subeg1) # 与上面 subeg 相同
traceback.print_exception(subeg2) # 其余 2 个异常

创建子类:derive

subgroupsplit 内部调用的是一个名为 derive 的方法,该方法可以创建一个子类(与原类有相同的 message__traceback____cause__, __context__ and __notes__),但会重新做一些封装,我们也可以手工调用 derive 创建自己的子类,或在继承类中重载此方法。

class BaseExceptionGroup(BaseException, Generic[_BaseExceptionT_co]):
def __new__(cls: type[Self], __message: str, __exceptions: Sequence[_BaseExceptionT_co]) -> Self:
...

class ExceptionGroup(BaseExceptionGroup[_ExceptionT_co], Exception):
def __new__(cls: type[Self], __message: str, __exceptions: Sequence[_ExceptionT_co]) -> Self:
...

注:3.11 的最终实现与 PEP654 中不同。

  • BaseExceptionGroupExceptionGroup 没有定义 __init__,仅定义了 __new__
  • __new__(cls,...) -> Self 实例化,入参是对象,返回该对象的实例
  • __init__(self,...) 初始化,self 即使用 __new__ 创建的实例
  • python 把实例化和初始化分开,利于我们更灵活的操作,比如:可以重载 __new__ 方便的实现“单例类”
// python 源码: Lib/Objects/exceptions.c
static PyObject *
BaseExceptionGroup_derive(PyObject *self_, PyObject *args)
{
PyBaseExceptionGroupObject *self = _PyBaseExceptionGroupObject_cast(self_);
PyObject *excs = NULL;
if (!PyArg_ParseTuple(args, "O", &excs)) {
return NULL;
}
PyObject *init_args = PyTuple_Pack(2, self->msg, excs);
if (!init_args) {
return NULL;
}
PyObject *eg = PyObject_CallObject(
PyExc_BaseExceptionGroup, init_args);
Py_DECREF(init_args);
return eg;
}

我们在 python 层重载基本 1 行就能达到相同的效果了:

class MyExceptionGroup(ExceptionGroup):
def derive(self, exc):
return MyGroup(self.message, exc)

目前感觉创建子类和创作 derive 方法用处不大,不多说了。

异常处理

捕获到异常组后,我们可能对这些内容感兴趣:

  1. 组里有多少个异常,树形结构如何?
  2. 每个叶子是一个具体的异常,如何提取?
  3. 异常组(树)中是否有重复异常,我可不希望重复处理?

所以首要任务是提取出所有叶子节点的异常:

def leaf_generator(exc, tbs=None):
if tbs is None:
tbs = []

tbs.append(exc.__traceback__)
if isinstance(exc, BaseExceptionGroup):
for e in exc.exceptions:
yield from leaf_generator(e, tbs)
else:
yield exc, tbs
tbs.pop()
def foo(v):
try:
raise ValueError(v)
except Exception as e:
return e

def bar():
raise ExceptionGroup("eg", [foo(1), foo(2)])

try:
bar()
except BaseException as e:
eg = e

for (i, (exc, tbs)) in enumerate(leaf_generator(eg)):
print(f"\n=== Exception #{i+1}:")
traceback.print_exception(exc)
print(f"The complete traceback for Exception #{i+1}:")
for tb in tbs:
traceback.print_tb(tb)

# output:
# === Exception #1:
# Traceback (most recent call last):
# File "/home/me/py311/main.py", line 51, in foo
# raise ValueError(v)
# ValueError: 1
# The complete traceback for Exception #1:
# File "/home/me/py311/main.py", line 61, in <module>
# bar()
# File "/home/me/py311/main.py", line 57, in bar
# raise ExceptionGroup("eg", [foo(1), foo(2)])
# File "/home/me/py311/main.py", line 51, in foo
# raise ValueError(v)
#
# === Exception #2:
# Traceback (most recent call last):
# File "/home/me/py311/main.py", line 51, in foo
# raise ValueError(v)
# ValueError: 2
# The complete traceback for Exception #2:
# File "/home/me/py311/main.py", line 61, in <module>
# bar()
# File "/home/me/py311/main.py", line 57, in bar
# raise ExceptionGroup("eg", [foo(1), foo(2)])
# File "/home/me/py311/main.py", line 51, in foo
# raise ValueError(v)

try...except*

try:
...
except* SpamError:
...
except* FooError as e:
...
except* (BarError, BazError) as e:
...

except* 表示每个子句可以处理多个异常,可以匹配异常组,及其子组。当匹配子组后,剩余的异常有后续的 except* 匹配。

所以,一个异常组可能导致多个 except* 被命中,但每次命中只会执行一次。

try:
raise ExceptionGroup(
"one",
[
TypeError(1),
ExceptionGroup("two", [TypeError(2), ValueError(3)]),
ExceptionGroup("three", [OSError(4)]),
],
)
except* (ValueError, OSError) as eg:
# 只会命中并执行一次,不会进来两次
# 原理是:eg = origin_eg.subgroup((ValueError, OSError))
print(f"ValueError and OSError: {eg!r}" )
except* TypeError as eg:
print(f"TypeError: {eg!r}" )

# output:
# ValueError and OSError: ExceptionGroup('one', [ExceptionGroup('two', [ValueError(3)]), ExceptionGroup('three', [OSError(4)])])
# TypeError: ExceptionGroup('one', [TypeError(1), ExceptionGroup('two', [TypeError(2)])])

Naked Exception

python 将 raise 后面没有具体异常的用法称为裸异常。裸异常通常在 except 中再次抛出:

try:
try:
raise BlockingIOError
except Exception as e:
print(repr(e))
raise
except Exception as e:
print(repr(e))
# output:
# BlockingIOError()
# BlockingIOError()

但此时即使不是裸异常,输出也是一样的:

try:
try:
raise BlockingIOError
except Exception as e:
print(repr(e))
raise e
except Exception as e:
print(repr(e))
# output:
# BlockingIOError()
# BlockingIOError()

那区别是是什么?—— 裸异常在 except 中二次引发异常时,raiseraise e 并不等效:

  • raise: 不会将当前 frame 添加到调用栈里。
  • raise e: 会添加。
def foobar():
try:
1 / 0
except ZeroDivisionError:
raise

foobar()
# output:
# Traceback (most recent call last):
# File "/home/me/py311/main.py", line 146, in <module>
# foobar()
# File "/home/me/py311/main.py", line 141, in foobar
# 1 / 0
# ~~^~~
# ZeroDivisionError: division by zer
def foobar():
try:
1 / 0
except ZeroDivisionError as e:
raise e

foobar()
# output:
# Traceback (most recent call last):
# File "/home/me/py311/main.py", line 146, in <module>
# foobar()
# File "/home/me/py311/main.py", line 143, in foobar
# raise e
# File "/home/me/py311/main.py", line 141, in foobar
# 1 / 0
# ~~^~~
# ZeroDivisionError: division by zero

说回异常组,当在 try...except* 中捕获到一般异常,而不是异常组时,此时的一般异常对于 except* 来说也可以视为裸异常。

try:
raise BlockingIOError
except* OSError as eg: # BlockingIOError 是 OSError 的子类
print(repr(eg))
# output:
# ExceptionGroup('', (BlockingIOError(),))

当用 except* 捕获到裸异常,则生成一个 message 为空的 ExceptionGroup("", <普通异常>)

except* 中引发异常

我们继续来研究二次抛出异常的问题,看看异常组是如何处理的。

  • 隐式抛出(二次抛出裸异常)
try:
try:
raise ExceptionGroup(
"eg",
[
ValueError(1),
TypeError(2),
OSError(3),
ExceptionGroup("nested", [ValueError(4), TypeError(5), OSError(6)]),
],
)
except* ValueError as e:
print(f"*ValueError: {e!r}") # [1,[4]]
raise # 隐式抛出(二次抛出裸异常)
except* OSError as e:
print(f"*OSError: {e!r}") # [3,[6]]
except ExceptionGroup as e:
print(repr(e)) # [1,2,[4,5]]
# output:
# *ValueError: ExceptionGroup('eg', [ValueError(1), ExceptionGroup('nested', [ValueError(4)])])
# *OSError: ExceptionGroup('eg', [OSError(3), ExceptionGroup('nested', [OSError(6)])])
# ExceptionGroup('eg', [ValueError(1), TypeError(2), ExceptionGroup('nested', [ValueError(4), TypeError(5)])])
  • 显式抛出(二次抛出非裸异常)
try:
try:
raise ExceptionGroup(
"eg",
[
ValueError(1),
TypeError(2),
OSError(3),
ExceptionGroup("nested", [ValueError(4), TypeError(5), OSError(6)]),
],
)
except* ValueError as e:
print(f"*ValueError: {e!r}") # [1,[4]]
raise e # 显式抛出(二次抛出非裸异常)
except* OSError as e:
print(f"*OSError: {e!r}") # [3,[6]]
except ExceptionGroup as e:
print(repr(e)) # [1,[4]],[2,[5]]
# output:
# *ValueError: ExceptionGroup('eg', [ValueError(1), ExceptionGroup('nested', [ValueError(4)])])
# *OSError: ExceptionGroup('eg', [OSError(3), ExceptionGroup('nested', [OSError(6)])])
# ExceptionGroup('', [
# ExceptionGroup('eg', [
# ValueError(1),
# ExceptionGroup('nested', [
# ValueError(4)
# ])
# ]),
# ExceptionGroup('eg', [
# TypeError(2),
# ExceptionGroup('nested', [TypeError(5)])
# ])
# ])
  • 链式抛出(Chaining):捕获到异常后,重新抛出一个新异常。
try:
try:
raise ExceptionGroup("eg", [ValueError("a")])
except* ValueError as e:
raise KeyError(e) # 另一种写法:
except ExceptionGroup as e:
print(repr(e))
# output:
# ExceptionGroup('', [KeyError(ExceptionGroup('eg', [ValueError('a')]))])
traceback.print_exception(e)
# output:
# | ExceptionGroup: (1 sub-exception)
# +-+---------------- 1 ----------------
# | Exception Group Traceback (most recent call last):
# | File "/home/me/py311/main.py", line 142, in <module>
# | raise ExceptionGroup("eg", [ValueError("a")])
# | ExceptionGroup: eg (1 sub-exception)
# +-+---------------- 1 ----------------
# | ValueError: a
# +------------------------------------
# |
# | During handling of the above exception, another exception occurred:
# |
# | Traceback (most recent call last):
# | File "/home/me/py311/main.py", line 144, in <module>
# | raise KeyError(e)
# | KeyError: ExceptionGroup('eg', [ValueError('a')])

总结

  • 隐式抛出得到 [1,2,[4,5]],显式抛出得到 [1,[4]],[2,[5]]
  • 最外层的异常捕获在做“聚合”操作:重新抛弃的异常和未处理的异常被聚合在一起。
    • 隐式抛出的聚合结构与原始异常组一致。
    • 显式抛出的聚合结构是将被抛异常与未处理异常做拼接,得到新的异常组。
  • 链式抛出:在恰当的位置得到 During handling of the above exception, another exception occurred: 的提示。
  • except* 中构造而异常组是个临时变量,不会影响原异常组的结构。

禁止 except & except* 混用

flake8 已经能报错:

SyntaxError: cannot have both 'except' and 'except*' on the same 'try'flake8(E999)

捕获 ExceptionGroupBaseExceptionGroup 使用 except 而不能用 except*

flake8 & pylint 当前无法静态检查,但运行时会报错:

except* ExceptionGroup:
...
# Traceback (most recent call last):
# File "/home/me/py311/main.py", line 152, in <module>
# except* ExceptionGroup:
# TypeError: catching ExceptionGroup with except* is not allowed. Use except instead.

禁止使用 except*: 空匹配

flake8 已经能报错:

SyntaxError: expected one or more exception typesflake8(E999)

标准库

8 行代码找到 3.11 标准库的变更:

# python3.10 REPL - pickling module names
>>> import sys
>>> with open("mods", "wb") as f:
... pickle.dump(sys.stdlib_module_names, f)

# python3.11 REPL - compare
>>> import sys
>>> with open("mods", "rb") as f:
... mods_310 = pickle.load(f)

# added
>>> sys.stdlib_module_names - mods_310
frozenset({'_typing', '_scproxy', '_tokenize', 'tomllib'})
# removed
>>> mods_310 - sys.stdlib_module_names
frozenset({'binhex'})

tomllib: PEP 680

标准库新增了 TOML 配置文件格式的解析库。虽然 toml 格式已经存在了十多年,但 python 并没有引入标准库。

python 同时也表明以后包或项目的元数据 TOML 将作为首选格式。

pytp 的 pyproject.toml:

[build-system]
requires = ["setuptools", "wheel"]

[tool.black]`
line-length = 88`
target-version = ['py37']

[tool.flake8]
line-length = 88

对于 [foo.bar] 是拆分为 2 个索引的。

import tomllib

with open("pyproject.toml", "rb") as f:
data = tomllib.load(f)
print(type(data)) # <class 'dict'>
print(data["build-system"]) # {'requires': ['setuptools', 'wheel']}
print(data["tool"])
print(data["tool"]["black"]) # {'line-length': 88, 'target-version': ['py37']}
print(data["tool"]["flake8"]) # {'line-length': 88}

asyncio.TaskGroup

新增 asyncio.TaskGroup 未来用于替代 asyncio.gather

复习 awaitable

aws 是 awaitable 的复数,asyncio 定义了 3 种 awaitable 对象:

  1. 协程对象(指调用协程函数返回的对象)
    async def foobar(i):
    return i+i
    async def main():
    await foobar(1) # 能执行,但得不到结果
    asyncio.run(main())
  2. task: 用于并行调度协程的对象,基于协程函数创建
    ...
    async def main():
    task = asyncio.create_task(foobar(1))
    await task
    print(task.result()) # 可以通过 task 得到结果,cancel等操作
    ...
  3. Futures:底层对象,表示一个异步操作的结果,基于协程函数、task、future 和 loop 创建。
    ...
    async def main():
    future = asyncio.ensure_future(foobar(1), loop=...)
    await future # 其实此处也是 task
    print(task.result())
    ...
  • Task 是 Future 的子类,所以具有 Future 的 result()done()cancell()add_done_callback 等方法。
  • ensure_future() 返回的其实也是 Task(所以我们真的很少需要直接接触 Future),其实 ensure_future() 底层也会调用 create_task(),所以 Future 相比 Task 是底层,但 ensure_future() 相比 create_task() 却是上层。
# python-src/Lib/asyncio/tasks.py
def _ensure_future(coro_or_future, *, loop=None):
...
if loop is None:
loop = events._get_event_loop(stacklevel=4)
try:
return loop.create_task(coro_or_future)
...

def create_task(coro, *, name=None, context=None):
loop = events.get_running_loop()
if context is None:
task = loop.create_task(coro)
else:
task = loop.create_task(coro, context=context)
...

ensure_future()create_task() 都会首先获取 loop,前者获取 _get_event_loop,后者 get_running_loop,所以在没有 loop 的环境下执行会有差异,比如 py 文件的第一级写如下代码:

task = asyncio.create_task(foobar(1)) # error:RuntimeError: no running event loop —— 报错退出
futu = asyncio.ensure_future(foobar(2)) # py3.9- PASS; py3.10+: DeprecationWarning: There is no current event loop

如果写在某个函数中,用 asyncio.run() 调用,则都 OK。

async def main():
await asyncio.create_task(foobar(1))
await asyncio.ensure_future(foobar(2))

asyncio.run(main())

上面代码演示的都是 awaitable 对象同步、串行执行的方式(通过 await 关键字),异步、并发执行则需要 asyncio.gather()asyncio.wait() 上场了。

复习 asyncio.gather() & asyncio.wait()

函数签名:

awaitable asyncio.gather(*aws, return_exceptions=False)

**并发(多个 awaitable 交替运行)**的典型方法是:

async def foobar(i):
_w = random.randint(1, 5)
await asyncio.sleep(_w)
print(_w, i)
return i + i

async def main():
tasks = [asyncio.create_task(foobar(_i)) for _i in range(3)]
futus = [asyncio.ensure_future(foobar(_i + 10)) for _i in range(3)]
await asyncio.gather(*tasks, *futus) # await asyncio.wait([*tasks, *futus])

asyncio.run(main())
# output:
# 2 12
# 3 10
# 4 0
# 4 1
# 4 2
# 5 11

注:上例仍是同步的,即线程(协程)在等待并发的协程完成才继续走。但这是为了便于演示,弄清楚了并发,设计一个异步并不是难事儿。

gatherwait 能达到差不多的并发效果,但在入参、返回值、如何取消等方面还是有差异:

文档:

# doc
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) -> (done, pending)
awaitable asyncio.gather(*aws, return_exceptions=False) -> GatheringFuture

源码:

# python_src/Libasyncio/tasks.py
async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
def gather(*coros_or_futures, return_exceptions=False):
  • 签名不同:
    • wait 是个协程函数,返回 task 的集合,await 的是个协程函数;
    • gather 是个普通函数,返回 Future,await 该 Future 得到 tasks 的运行结果列表。
  • 入参、返回值不同:
    • wait 入参是个无序集合,返回值也是无序的,还是个元祖:(已完成, 未完成)
    • gather 入参是依次多个 awaitable,返回值也是个 awaitable(与入参相同顺序的 list)。

wait 的入参 return_when 有 3 个可选值,控制 wait 函数的返回时机:

  • ALL_COMPLETED(默认):所有 task 完成,wait 才完成,此时 done 有包含所有 task,pending 没有 task。
  • FIRST_EXCEPTION:某一个 task 异常,wait 即完成,此时 done、pending 可能都有 task。
  • FIRST_COMPLETED:某一个 task 完成,wait 即完成,此时 done 仅有 1 个 task,pending 包含其他 task。——下面代码演示:

但即使 wait 返回了,其他 task 还是在继续运行的:

async def foobar(i):
_w = random.randint(1, 5)
await asyncio.sleep(_w)
return (i, _w)

async def main():
tasks = [asyncio.create_task(foobar(_i)) for _i in range(3)]
futus = [asyncio.ensure_future(foobar(_i + 10)) for _i in range(3)]
done, pending = await asyncio.wait(
[*tasks, *futus], return_when=asyncio.FIRST_COMPLETED
)
print([(_task.get_name(), _task._state, _task.result()) for _task in done])
print([(_task.get_name(), _task._state) for _task in pending])
await asyncio.sleep(5)
print([(_task.get_name(), _task._state, _task.result()) for _task in done])
print([(_task.get_name(), _task._state, _task.result()) for _task in pending])

asyncio.run(main())

# output:
# [('Task-7', 'FINISHED', (12, 1))]
# [('Task-6', 'PENDING'), ('Task-4', 'PENDING'), ('Task-5', 'PENDING'), ('Task-3', 'PENDING'), ('Task-2', 'PENDING')]
# [('Task-7', 'FINISHED', (12, 1))]
# [('Task-6', 'FINISHED', (11, 5)), ('Task-4', 'FINISHED', (2, 5)), ('Task-5', 'FINISHED', (10, 5)), ('Task-3', 'FINISHED', (1, 2)), ('Task-2', 'FINISHED', (0, 2))]

可见经过一段时间后,原来 PENDING 的 5 个 task 都 FINISHED 了,并且计算出了正确的结果。

但是如果遇到某个(些) task 抛异常,wait 的表现会很怪异:

async def foobar(i):
_w = random.randint(0, 5)
await asyncio.sleep(_w)
i / _w # 当 _w = 0 时异常
return (i, _w)


async def main():
tasks = [asyncio.create_task(foobar(_i)) for _i in range(3)]
futus = [asyncio.ensure_future(foobar(_i + 10)) for _i in range(3)]
try:
done, pending = await asyncio.wait(
[*tasks, *futus], return_when=asyncio.ALL_COMPLETED
)
except Exception as e:
print(e)
finally:
print([(_task.get_name(), _task._state) for _task in done])
# print([(_task.get_name(), _task._state, _task.result()) for _task in done]) # 主动引发异常
print([(_task.get_name(), _task._state) for _task in pending])

asyncio.run(main())
# output:
# 1
# 2
# 1
# 3
# 0
# 1
# [('Task-6', 'FINISHED'), ('Task-4', 'FINISHED'), ('Task-7', 'FINISHED'), ('Task-5', 'FINISHED'), ('Task-3', 'FINISHED'), ('Task-2', 'FINISHED')]
# []
# Task exception was never retrieved
# future: <Task finished name='Task-6' coro=<foobar() done, defined at /data/kevin/workspace/zproject/me/ME/blog/2022-11-11-python3.11/main.py:5> exception=ZeroDivisionError('division by zero')>
# Traceback (most recent call last):
# File "/data/kevin/workspace/zproject/me/ME/blog/2022-11-11-python3.11/main.py", line 9, in foobar
# i / _w # 当 _w = 0 时异常
# ~~^~~~
# ZeroDivisionError: division by zero

其中的 except 并不能捕获异常,当退出 main() 前释放 task 时会触发异常,如果去掉“主动引发异常”那行,则会在调用 _task.result() 时就触发异常。—— wait 处理异常的方式就比较蹩脚,想捕获看不到,不小心又触雷。

再来聊聊 gather():

gather 的入参 return_exceptions 含义是是否将异常和普通 result 一样对待,常规返回,有 2 个可选值:

  • False(默认):不随 result 一起返回,即:一旦某个 task 引发异常,则等待 gather 的外部任务立即感知,但其他 tasks 是不会被取消的,会继续在并发协程里运行。
  • True:异常随 result 一起返回。

下面代码演示 True 时情况:

async def foobar(i):
_w = random.randint(0, 5)
await asyncio.sleep(_w)
i / _w # 当 _w = 0 时异常
return i + i

async def main():
tasks = [asyncio.create_task(foobar(_i)) for _i in range(3)]
futus = [asyncio.ensure_future(foobar(_i + 10)) for _i in range(3)]
_results = await asyncio.gather(*tasks, *futus, return_exceptions=True)
print(_results)

asyncio.run(main())
# output:
# [ZeroDivisionError('division by zero'), 2, 4, 20, 22, 24]

如果 False,则会直接引发异常,就需要 try...except 上场了:

async def foobar(i):
_w = random.randint(0, 5)
await asyncio.sleep(_w)
i / _w # 当 _w = 0 时异常
return i + i

async def main():
tasks = [asyncio.create_task(foobar(_i)) for _i in range(3)]
futus = [asyncio.ensure_future(foobar(_i + 10)) for _i in range(3)]
try:
_results = await asyncio.gather(*tasks, *futus, return_exceptions=False)
except Exception as e:
print(e)
finally:
# print(_results)
pass

asyncio.run(main())

第一个抛异常的 task 会让 gather() 退出,此时不能 print(_results),因为 _results 是没有定义的。

总结一下对于 task 抛出的异常处理,gatherwait 策略异同:

  • waitreturn_when 控制返回时机,区分异常;抛异常的 task 视同正常完成的 task,放入 done 列表中;所以外层不捕获异常,但对异常 task 执行 task.result() 则引发异常,如果不处理异常销毁 task 时也会引发看似莫名其妙的异常。。
  • gatherreturn_exceptions 控制返回时机和返回值,True 是一起返回,异常放入返回值,False 是异常立即返回,外层可以捕获到异常,但其他 task 的返回值获取变得困难。

如何更加优雅的统筹 “返回时机”、“返回值”、“异常处理” 处理方式 —— 就是 python3.11 设计 asyncio.TaskGroup 的初衷。

class asyncio.TaskGroup

TaskGroup 是个 class,目前只公开了一个方法:create_task() - 将协程函数封装成 task,和一个 async with...as... 的用法 —— 简练而不简单。

create_task(coro, *, name=None, context=None)

源码实现也不复杂

# python_src/Lib/asyncio/taskgroups.py
class TaskGroup:
...
def create_task(self, coro, *, name=None, context=None):
...
if context is None:
task = self._loop.create_task(coro)
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
task.add_done_callback(self._on_task_done)
self._tasks.add(task)
return task

官方推荐 async with...as... 典型用法:

async def main():
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(foobar(_i)) for _i in range(3)]
print([_task.result() for _task in tasks])

先说设计目标(即:当前的已实现的功能):

  • async with...as... 语句将等待组中所有 task 完成才退出,退出前执行 tg.__aexit__()
  • tg 执行期间,可以随时使用 tg.create_task() 添加新任务到组中。tg 已经全部完成之后不能再添加。
  • 组内某个 task 被取消(抛出 asyncio.CancelledError 异常),组内其他 task 不受影响。—— 可以单独取消某个 task,不波及其他 task。
  • 组内任何 task 抛出非 asyncio.CancelledError 的异常,组内其他任务将全部被 Cancell。—— 其他异常时,组内 task 同进退。
  • 所有在 async with...as... 中抛出的异常都是异常组,所以建议 try...except* 捕获。

看下面代码

import asyncio
import random

async def foobar(i):
_w = random.randint(1, 5) # 随机 sleep _w 秒
print(i, _w)
await asyncio.sleep(_w)
if _w == 1:
raise ValueError(i)
if _w == 2:
raise asyncio.CancelledError(i)
if _w == 3:
raise asyncio.TimeoutError(i)
return i + i

async def main():
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(foobar(_i), name=_i) for _i in range(3)]
print([_task.result() for _task in tasks])
except ValueError as e:
print(f"Catch ValueError: {e=}")
except Exception as e:
print(f"Catch Exception: {e=}")
except asyncio.CancelledError as ce: # `CancelledError` 比较特殊,继承自 `BaseException`
print(f"Catch CancelledError: {ce=}")
finally:
print([(_task.get_name(), _task._state) for _task in tasks]) # 打印所有 task 最终的结束状态

asyncio.run(main())

多次运行 $ time python main.py,得到几种不同的结果:

  • 所有 task 无异常
    • 3 个 task 都正常结束,执行了 async with...as... 外的 print 语句。
      0 4
      1 4
      2 5
      [0, 2, 4]
      [('0', 'FINISHED'), ('1', 'FINISHED'), ('2', 'FINISHED')]
      python main.py 0.10s user 0.01s system 2% cpu 5.105 total
  • 取消异常:不波及其他
    • 0 号 task 抛出 CancelledError,不影响其他 task 运行到 FINISHED,总共耗时 4s。
      0 2
      1 4
      2 4
      Catch CancelledError: ce=CancelledError(0)
      [('0', 'CANCELLED'), ('1', 'FINISHED'), ('2', 'FINISHED')]
      python main.py 0.07s user 0.02s system 2% cpu 4.092 total
    • 0、1 号 task (几乎)同时抛 CancelledError 异常,也不波及剩下的 1 个 task,4s 才退出。
      0 2
      1 2
      2 4
      Catch CancelledError: ce=CancelledError(0)
      [('0', 'CANCELLED'), ('1', 'CANCELLED'), ('2', 'FINISHED')]
      python main.py 0.10s user 0.00s system 2% cpu 4.104 total
  • 普通异常:同进退、异常组
    • 2 号 task 1s 时抛出普通异常,立即退出,其他 task 以 CANCELLED 结束,即使其他 task 在 3s 后也会抛异常(没机会了),捕获到的是异常组。
      0 3
      1 3
      2 1
      Catch Exception: e=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError(2)])
      [('0', 'CANCELLED'), ('1', 'CANCELLED'), ('2', 'FINISHED')]
      python main.py 0.09s user 0.01s system 8% cpu 1.099 total
    • 1、2 号 task (几乎)同时抛 TimeoutError 异常,会自动组合成异常组,而不会其中 1 个 cancell 掉另一个。—— 临近的普通异常自动组合
      0 5
      1 3
      2 3
      Catch Exception: e=ExceptionGroup('unhandled errors in a TaskGroup', [TimeoutError(1), TimeoutError(2)])
      [('0', 'CANCELLED'), ('1', 'FINISHED'), ('2', 'FINISHED')]
      python main.py 0.08s user 0.00s system 2% cpu 3.088 total
  • 取消 + 普通异常:不波及、其余同进退、异常组
    • 2 个 CancelledError + 1 个普通异常:普通异常合成异常组,4s 才退出。
      0 3
      1 2
      2 2
      Catch Exception: e=ExceptionGroup('unhandled errors in a TaskGroup', [TimeoutError(0)])
      [('0', 'FINISHED'), ('1', 'CANCELLED'), ('2', 'CANCELLED')]
      python main.py 0.09s user 0.01s system 3% cpu 3.105 total

TaskGroupgatherwait 对比

特性results = await gather()(done, pending) = await wait()async with TaskGroup() as tg
返回时机return_exceptions 控制:True全完成共同返回;False首异常立即返回。return_when 控制:全完成共同、首异常立即、首完成立即。非取消异常立即返回;其他全完成共同返回;
返回值Future.results()(done, pending)
外层捕获异常全完成共同返回时遍历查找返回值找出异常;首异常立即返回时可捕获。异常处理有坑,遇到异常 result() 或销毁时才会遇到。都需要外层捕获

所以:

  • 3 种方式都可以实现“全完成共同返回、首异常立即返回”,gatherwait 需要用入参控制,TaskGroup 没法控制。
  • gather 便于获取 result;wait 便于按状态分组;TaskGroup 便于绑定 task 命运共同体。
  • 如需要实现取消 1 个 task 而不影响组中其他 task 的需求时:gatherwait 受限或无法实现, TaskGroup 则就是为此而生的。

typing.TypeVarTuple: PEP 646

  • Python3.5 引入了 PEP 484 规范的类型标注(typing hints),定义了用于类型检查而 Any, Union, Tuple, Callable等,用于泛型的 TypeVar,OptionalGeneric等;
  • python3.10 新增了语法 T = int | str
  • Python3.11 引入了 PEP 646 规范的 TypeVarTuple,用于可变参数的泛型。

复习 TypeVar

类型标注是对参数文档的替代,可以使用 mypy 工具检查,但目前不会影响运行:

def foobar(name: str):
print(f"hello: {name}")

foobar(100)

类型标注为 str,但送入的 int 变量,mypy 可以提示错误,但仍可以正常运行。

$ python main.py
hello: 100
$ mypy main.py
main.py:13: error: Argument 1 to "foobar" has incompatible type "int"; expected "str" [arg-type]

强烈建议你将 mypy 配置到 VSCode 中,开启自动检查:VSCode -- Settings -- Python>Linting: Mypy Enabled -- 勾选。

这样你会在 100 下面看到红色的波浪号。

Union 等类型可以做限定某些类型的入参标注,有点泛型的那个意思了:

from typing import Union

def foobar(name: Union[str, int]):
print(f"hello: {name}")

foobar("Tom")
foobar(100)
# output:
# hello: Tom
# hello: 100

python3.10 引入了 T = int | str 后,又多了一个等效的选择:

U = int | str

def foobar(name: U):
print(f"hello: {name}")

foobar("Tom")
foobar(100)
# output:
# hello: Tom
# hello: 100

使用 TypeVar 可实现更通用的泛型:

from typing import TypeVar

T = TypeVar("T", int, str)

def foobar(name: T):
print(f"hello: {name}")

foobar("Tom")
foobar(100)
# output:
# hello: Tom
# hello: 100

TypeVar 可以添加任意多你指定的类型,甚至可以用 TypeVar("T") 表示不限定类型。

from typing import TypeVar, List, Dict

T = TypeVar("T", int, str, List, Dict)

def foobar(name: T):
print(f"hello: {name}")

foobar("Tom")
foobar(100)
foobar(True)
foobar(["Tom", 100, True])
foobar({"Tom": 100, "Jerry": 99})
# output:
# hello: Tom
# hello: 100
# hello: True
# hello: ['Tom', 100, True]
# hello: {'Tom': 100, 'Jerry': 99}

TypeVar 创建一个泛型类的签名为:

# python_src/Libtyping.py
class TypeVar(_Final, _Immutable, _BoundVarianceMixin, _PickleUsingNameMixin,_root=True):
def __init__(self, name, *constraints, bound=None, covariant=False, contravariant=False):
...

第一个参数是 name,后面跟类型的限制,没有则不限制,所以常见用法是:

T = TypeVar('T')  # Can be anything
A = TypeVar('A', str, bytes) # Must be str or bytes

所以你可能会发现:

Union[int, str]U = int | strT = TypeVar('T', int, str) 三者是等效的。

并且似乎 T = TypeVar('T') 就是我们想要的泛型了,其实不然。

from typing import TypeVar

U = int | str


def max_1(a: U, b: U) -> U:
return max(a, b)


T = TypeVar("T", int, str)


def max_2(a: T, b: T) -> T:
return max(a, b)


max_1("foo", 1) # mypy 没有给出错误提示
max_2("foo", 1) # mypy 可以识别标注,并给出错误提示
# Value of type variable "T" of "max_2" cannot be "object" [type-var]mypy(error)

TypeVar 比 Union 更深层而表达了泛型而一致性问题,同理:

def max_3(items: list[U]) -> U:
return max(*items)

def max_4(items: list[T]) -> T:
return max(*items)

max_3([1, 2, "3"]) # mypy 没有错误提示
max_4([1, 2, "3"]) # mypy 给出错误提示

复习 Generic

Generic 是个抽象基类,专用于创建泛型类型。

from typing import TypeVar, Generic, List

KT = TypeVar("KT", int, str)
VT = TypeVar("VT", int, str)


class MyDict(Generic[KT, VT]):
def __init__(self, keys: List[KT], values: List[VT]) -> None:
super().__init__()
self.data = {}
for _k, _v in zip(keys, values):
self.data[_k] = _v

def __getitem__(self, key: KT) -> VT:
return self.data[key]


def lookup(mapping: MyDict[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default


_m1 = MyDict([0, 1, 2], ["A", "B", "C"])
_m2 = MyDict(["A", "B", "C"], [0, 1, 2])
print(lookup(_m1, 0, "A"))
print(lookup(_m2, "B", 100))
# output:
# A
# 1

学习 TypeVarTuple

可变参数泛型一直是 python 社区的多年来强烈要求,比如 numpy 中换算图像矩阵:

def to_gray(videos: Array): ...

从签名上你无法看出 videos 是 batch × time × height × width × channels 还是 time × batch × channels × height × width,送错了参数可能很难察觉,用文档约束是历史上能用的、但不好用的手段。

python 社区提议用:

def to_gray(videos: Array[Time, Batch, Height, Width, Channels]): ...

即使允许了这种语法,函数的开发者将会感觉不灵活,没有泛型的加持后续升级就很难搞兼容性。

python3.11 提供了新的解决方案:TypeVarTuple

from typing import TypeVar, TypeVarTuple, NewType

DType = TypeVar('DType')
Shape = TypeVarTuple('Shape')

class Array(Generic[DType, *Shape]):

def __abs__(self) -> Array[DType, *Shape]: ...

def __add__(self, other: Array[DType, *Shape]) -> Array[DType, *Shape]: ...


Batch = NewType('Batch', int)
Height = NewType('Height', int)
Width = NewType('Width', int)
channels = NewType('channels', int)

def to_gray(videos: Array[float, Batch, Height, Width, channels]):...

用户调用:

from typing import Literal as L

x: Array[float, L[5], L[480], L[640], L[3]] = Array()
to_gray(x)

typing.Self: PEP 673

python3.11 之前,没有手段对返回实例自己的方法做返回值标注:

class Shape:
def set_scale(self, scale: float) -> Shape: # 语法错误
self._scale = scale
return self

def area(self):
pass

这可以说是标注技术的一个缺陷,3.11 填补这个遗憾,python 首先想到的也是让上面的语法能够不错误,但是:

class Circle(Shape):
def __init__(self, radius: int):
self._r = radius
self._scale = 1

def area(self):
return 3.14 * self._r * self._r * self._scale


c = Circle(3)
c = c.set_scale(0.5) # c 将用 Shape 做类型判断,而不是希望的 Circle
print(c.area())

可见返回 Shape 会让继承类很尴尬,虽然不影响执行,但类型标注和类型检查还是不正确。所以:新增了 Self。

from typing import Self

class Shape:
def set_scale(self, scale: float) -> Self:
self._scale = scale
return self

def area(self):
pass

class Circle(Shape):
def __init__(self, radius: int):
self._r = radius
self._scale = 1

def area(self):
return 3.14 * self._r * self._r * self._scale

def cut_circle(circle: Circle):
pass

c = Circle(3)
c = c.set_scale(0.5) # 函数签名为:(method) set_scale: (scale: float) -> Circle
cut_circle(c)

这样不但 checker 正确检查出了函数签名中返回值而标注,cut_circle(c) 中也不会报入参错误了。

Self 也可以做方法对的参数,还有更多用法参考 PEP673。

typing.LiteralString: PEP 675

  • Python3.8 引入了 PEP 586 规范的 typing.Literal(字面量)
  • Python3.11 引入 PEP 675 规范的 LiteralString

和前面一样,使用 typing 中的类型标注最终都是为了尽早、及时的找到 TypeError,无论是用 mypy 这种 checker,或者在运行时找到。

typing.Literal 是干啥的?—— 它是解决这种需求的:用户只想允许某些值(比如 3)作为变量/入参,不希望其他值(比如 1、2、4...),这种需求用 typing 中其他的类型标注实现不了,所以设计了 Literal。

from typing import Literal

def accepts_only_four(x: Literal[4]) -> None:
pass

accepts_only_four(4) # OK
accepts_only_four(19) # mypy 报错:
# Argument 1 to "accepts_only_four" has incompatible type "Literal[19]"; expected "Literal[4]" [arg-type]mypy(error)

或者字符字面量:

MODE = Literal["r", "rb", "w", "wb"]

def open_helper(file: str, mode: MODE) -> str:
pass

open_helper("/some/path", "r") # Passes type check
open_helper("/other/path", "typo") # mypy 报错:
# Argument 2 to "open_helper" has incompatible type "Literal['typo']"; expected "Literal['r', 'rb', 'w', 'wb']" [arg-type]mypy(error)

所谓字面量类型检查,就是要字面上一模一样匹配才算检查通过。

假如有这样一个操作 SQL 数据库而函数:

def query_user(conn: Connection, user_id: str) -> User:
query = f"SELECT * FROM data WHERE user_id = {user_id}"
conn.execute(query)
... # Transform data to a User object and return it

用户执行下面是正常操作:

query_user(conn, "user123")  # OK.

但如果用户执行下面操作将会删除整个 table:

query_user(conn, "user123; DROP TABLE data;")

如何防止这样的“SQL 注入”攻击?—— LiteralString:仅接受由已知字面量组成的字符串。

from typing import LiteralString

def query_user(sql: LiteralString) -> ...
...

具体是如何检查错误的……似乎还没实现好,mypy 也尚不支持。

typing.dataclass_transform: PEP 681

python3.7 引入了 PEP 557 规范的 dataclasses,用于存储数据对象(如:数据库相关联的 data 对象),这类对象一般具有的特征是:

  • 同类,并可比较
  • 值很重要,不需要太多方法
  • 一旦赋值,通常不可变

复习 @dataclasses.dataclass 装饰器

dataclasses 模块提供了一个装饰器 @dataclass 来方便我们实现这样的对象:

from dataclasses import dataclass
from typing import List


@dataclass()
class AAU:
name: str
type: str
SLAVE_NUM: int
CHN_NUM: int
BAND0: dict
opt_port: List


aau = AAU(
name="9642A",
type="S26",
SLAVE_NUM=2,
CHN_NUM=32,
BAND0={"start_freq": 2515000, "stop_freq": 2675000},
opt_port=[21, 19],
)
print(aau)
# output:
# AAU(name='9642A', type='S26', SLAVE_NUM=2, CHN_NUM=32, BAND0={'start_freq': 2515000, 'stop_freq': 2675000}, opt_port=[21, 19])

装饰器 @dataclass() 为你自动生成 __init__(), __eq__()__lt__() 等方法,当然你也可以不让它自动生成,自己来手动实现。

学习 @typing.dataclass_transform 装饰器

待补充

typing.TypedDict: PEP 655

python3.8 引入了 typing.TypedDict,让字典类型使用更方便:

from typing import TypedDict

class Point2D(TypedDict):
x: int
y: int
label: str

a: Point2D = {"x": 1, "y": 2, "label": "good"} # OK
b: Point2D = {"z": 3, "label": "bad"} # Fails type check
c: Point2D = {"label": "bad"} # Fails type check

assert Point2D(x=1, y=2, label="first") == dict(x=1, y=2, label="first")
  1. 定义、赋值都更方便了
  2. 增加不满足预期的 key: value 会报错(如:b 中增加了 z)
  3. 没有定义需要的 key: value 也会报错(如:c 中缺少 x、y)

后 2 点有点极端,不能多也不能缺,而 PEP 655 通过引入 Required[]NotRequired[] 可以让 TypedDict 里面的键的定义明确是否强依赖:

from typing import TypedDict, Required, NotRequired

class Point2D(TypedDict):
x: Required[int]
y: NotRequired[int]
label: str

这样在实例化对象时检查器就能够正确区分了。这种功能在 typescript 和 go 中最初就设计了,python 发展了 20 年了才加入。

getmembers_static

3.11 之前:

getattr(obj, attrname)hasattr(obj, attrname) 都可能会在获取或者判断属性是否存在时触发代码执行,尤其是写在获取目标所在 py 文件根级别下的代码都会被执行,及其 import 的文件。

inspect.getmembers(obj) 获取 obj 所有的属性,同样也会触发代码的执行。

为了更省资源的获取属性和方法,python3.2 引入了 inspect.getattr_static(obj, attrname),可以静态的内省,获取属性而不触发动态查找功能,但只是获取指定 name 的 attr。获取所有 members 没有静态的方法。

3.11,时隔 9 年,终于加入了 inspect.getmembers_static(obj) —— 终于有了静态内省获取所有成员的函数了。

缺陷与陷阱

最后,泼一点冷水。

不要升级这么快,至少等到几个月后的 3.11.1 或 3.11.2,一般这 2 个都是比较重要或重大的 bugfix,追求稳定性的话,不如再等几个月。

其次,distutils 曾经是 python 打包的官方必选,写个 setup.py 就能打包了,但 distutils 即将与 3.12 废弃,3.11 中也开始提示预废弃,并提示用 setuptools 替代。setuptools 使用 setup.cfg 作为打包配置文件,为了兼容,python 官方推荐用 setuptools 入口运行 setup.py —— 一个蹩脚的方案。

setup.cfg 当前简直是个垃圾桶,大量与打包无关的内容写入其中,而不是更应该去的 project.toml,使得 setup 这个单词失去了应有的意义。这可能是官方此次加入 tomllib 的用意吧:赶紧吧 project.toml 给我用起来。