forked from iovisor/bcc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_uprobes.py
More file actions
executable file
·140 lines (127 loc) · 4.51 KB
/
Copy pathtest_uprobes.py
File metadata and controls
executable file
·140 lines (127 loc) · 4.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
# Copyright (c) PLUMgrid, Inc.
# Licensed under the Apache License, Version 2.0 (the "License")
import bcc
import ctypes
import errno
import os
import subprocess
import shutil
import time
import unittest
class TestUprobes(unittest.TestCase):
def test_simple_library(self):
text = """
#include <uapi/linux/ptrace.h>
BPF_ARRAY(stats, u64, 1);
static void incr(int idx) {
u64 *ptr = stats.lookup(&idx);
if (ptr)
++(*ptr);
}
int count(struct pt_regs *ctx) {
bpf_trace_printk("count() uprobe fired");
u32 pid = bpf_get_current_pid_tgid();
if (pid == PID)
incr(0);
return 0;
}"""
test_pid = os.getpid()
text = text.replace("PID", "%d" % test_pid)
b = bcc.BPF(text=text)
b.attach_uprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
b.attach_uretprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
libc = ctypes.CDLL("libc.so.6")
libc.malloc_stats.restype = None
libc.malloc_stats.argtypes = []
libc.malloc_stats()
self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
b.detach_uretprobe(name="c", sym="malloc_stats", pid=test_pid)
b.detach_uprobe(name="c", sym="malloc_stats", pid=test_pid)
def test_simple_binary(self):
text = """
#include <uapi/linux/ptrace.h>
BPF_ARRAY(stats, u64, 1);
static void incr(int idx) {
u64 *ptr = stats.lookup(&idx);
if (ptr)
++(*ptr);
}
int count(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
incr(0);
return 0;
}"""
b = bcc.BPF(text=text)
b.attach_uprobe(name="/usr/bin/python", sym="main", fn_name="count")
b.attach_uretprobe(name="/usr/bin/python", sym="main", fn_name="count")
with os.popen("/usr/bin/python -V") as f:
pass
self.assertGreater(b["stats"][ctypes.c_int(0)].value, 0)
b.detach_uretprobe(name="/usr/bin/python", sym="main")
b.detach_uprobe(name="/usr/bin/python", sym="main")
def test_mount_namespace(self):
text = """
#include <uapi/linux/ptrace.h>
BPF_TABLE("array", int, u64, stats, 1);
static void incr(int idx) {
u64 *ptr = stats.lookup(&idx);
if (ptr)
++(*ptr);
}
int count(struct pt_regs *ctx) {
bpf_trace_printk("count() uprobe fired");
u32 pid = bpf_get_current_pid_tgid();
if (pid == PID)
incr(0);
return 0;
}"""
# Need to import libc from ctypes to access unshare(2)
libc = ctypes.CDLL("libc.so.6", use_errno=True)
# Need to find path to libz.so.1
libz_path = None
p = subprocess.Popen(["ldconfig", "-p"], stdout=subprocess.PIPE)
for l in p.stdout:
n = l.split()
if n[0] == b"libz.so.1":
# if libz was already found, override only if new lib is more
# specific (e.g. libc6,x86-64 vs libc6)
if not libz_path or len(n[1].split(b",")) > 1:
libz_path = n[-1]
p.wait()
p.stdout.close()
p = None
self.assertIsNotNone(libz_path)
# fork a child that we'll place in a separate mount namespace
child_pid = os.fork()
if child_pid == 0:
# Unshare CLONE_NEWNS
if libc.unshare(0x00020000) == -1:
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])
# Remount root MS_REC|MS_PRIVATE
if libc.mount(None, b"/", None, (1<<14)|(1<<18) , None) == -1:
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])
if libc.mount(b"tmpfs", b"/tmp", b"tmpfs", 0, None) == -1:
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])
shutil.copy(libz_path, b"/tmp")
libz = ctypes.CDLL("/tmp/libz.so.1")
time.sleep(1)
libz.zlibVersion()
time.sleep(5)
os._exit(0)
libname = "/tmp/libz.so.1"
symname = "zlibVersion"
text = text.replace("PID", "%d" % child_pid)
b = bcc.BPF(text=text)
b.attach_uprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
b.attach_uretprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
time.sleep(1)
self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
b.detach_uretprobe(name=libname, sym=symname, pid=child_pid)
b.detach_uprobe(name=libname, sym=symname, pid=child_pid)
os.wait()
if __name__ == "__main__":
unittest.main()