Browse Source

chain + more cpy objects

master
Martijn 2 years ago
parent
commit
abf3149b64
  1. 65
      chain/__init__.py
  2. 74
      chain/async_.py
  3. 73
      chain/sync.py
  4. 92
      obfuscator/deobf/tictactoe.py
  5. 3
      obfuscator/obf/fibonacci.py
  6. 3
      obfuscator/obf/hello_world.py
  7. 3
      obfuscator/obf/inner_func.py
  8. 11
      obfuscator/obf/sort.py
  9. 14
      obfuscator/obf/tictactoe.py
  10. 1
      obfuscator/obfuscator.py
  11. 11
      obfuscator/visitors/statement_transformer.py
  12. 5
      py_ctypes/__init__.py
  13. 150
      py_ctypes/cpy_types/common_types.py
  14. 2
      py_ctypes/cpy_types/meta_types.py
  15. 31
      py_ctypes/ctype_view.py
  16. 6
      py_ctypes/test.py
  17. 9
      voice_tools/main.py
  18. 1
      voice_tools/requirements.txt

65
chain/__init__.py

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
from chain.sync import pipeline
from chain.async_ import async_pipeline
if __name__ == '__main__':
@pipeline(threaded=True) # Enable executing chains in threads
def x():
print("x")
return 10 # => passed to children
@x.chain(thread=False) # Don't thread children of this method (copies parent behavior by default)
def y(z):
print("y")
print(z)
return z, z+2 # => passed to children
@x.chain # All decorators are callable without parentheses
def q(r):
print("q")
print(r**2)
@y.chain
def a(b, c):
print("a")
print(b + c)
print(x()) # Output from all children, or None if all return None
print("---")
print("Also works with asyncio:")
print("---")
@async_pipeline(futures=True)
async def a_x(): # in async_pipeline chains, all children need to be async
print("x")
return 10
@a_x.chain(future=False) # Don't wrap children of this method in futures (copies parent behavior by default)
async def a_y(z):
print("y")
print(z)
return z, z + 2
@a_x.chain
async def a_q(r):
print("q")
print(r ** 2)
@a_y.chain
async def a_a(b, c):
print("a")
print(b + c)
return 1
# This returns [1, None]
# because:
# x => [y, q]
# y => a
# q => None
# a => 1
# x => [a, q]
import asyncio
print(asyncio.run(a_x()))

74
chain/async_.py

@ -0,0 +1,74 @@ @@ -0,0 +1,74 @@
from asyncio import create_task
from functools import wraps
from typing import Callable, Union
MAX_THREADS = 4
def async_pipeline(futures: Union[bool, Callable] = False):
if not isinstance(futures, bool):
return async_pipeline()(futures)
def decorator(func: Callable):
if hasattr(func, "chain"): # Already a pipeline, don't overwrite that property
return func
call_after = []
@wraps(func)
async def invoke(*args, **kwargs):
res = await func(*args, **kwargs)
if len(call_after) == 0:
return res
ret = []
if futures:
futs = []
for cb in call_after:
if isinstance(res, tuple):
fut = create_task(cb(*res))
elif res is not None:
fut = create_task(cb(res))
else:
fut = create_task(cb())
futs.append(fut)
for f in futs:
await f
ret.append(f.result())
else:
for cb in call_after:
if isinstance(res, tuple):
ret.append(await cb(*res))
elif res is not None:
ret.append(await cb(res))
else:
ret.append(await cb())
if len(call_after) == 1:
return ret[0]
if all(v is None for v in ret):
return None
return ret
def chain(future: bool = futures):
if not isinstance(future, bool):
return invoke.chain()(future)
def chain_decorator(f):
lined = async_pipeline(future)(f)
call_after.append(lined)
return lined
return chain_decorator
invoke.chain = chain
invoke._children = call_after
return invoke
return decorator

73
chain/sync.py

@ -0,0 +1,73 @@ @@ -0,0 +1,73 @@
from functools import wraps
from concurrent.futures.thread import ThreadPoolExecutor
MAX_THREADS = 4
def pipeline(threaded=False):
if not isinstance(threaded, bool):
return pipeline()(threaded)
def decorator(func):
if hasattr(func, "chain"): # Already a pipeline, don't overwrite that property
return func
call_after = []
@wraps(func)
def invoke(*args, **kwargs):
res = func(*args, **kwargs)
if len(call_after) == 0:
return res
ret = []
if threaded:
with ThreadPoolExecutor(max_workers=min(MAX_THREADS, len(call_after))) as executor:
futs = []
for cb in call_after:
if isinstance(res, tuple):
fut = executor.submit(cb, *res)
elif res is not None:
fut = executor.submit(cb, res)
else:
fut = executor.submit(cb)
futs.append(fut)
for f in futs:
ret.append(f.result())
else:
for cb in call_after:
if isinstance(res, tuple):
ret.append(cb(*res))
elif res is not None:
ret.append(cb(res))
else:
ret.append(cb())
if len(call_after) == 1:
return ret[0]
if all(v is None for v in ret):
return None
return ret
def chain(thread=threaded):
if not isinstance(thread, bool):
return invoke.chain()(thread)
def chain_decorator(f):
lined = pipeline(thread)(f)
call_after.append(lined)
return lined
return chain_decorator
invoke.chain = chain
invoke._children = call_after
return invoke
return decorator

92
obfuscator/deobf/tictactoe.py

@ -0,0 +1,92 @@ @@ -0,0 +1,92 @@
board = [0] * 9
try_ = lambda t, *a, f=lambda a:a, e=Exception, **k,:(r:={}).pop(
'r',
type(
'',
(__import__('contextlib').ContextDecorator,),
{
'__enter__':int,
'__exit__':lambda s,*a:isinstance(
a[1], e
) and [r.update(
r=f(a)
)]
}
)()(t)(*a, **k)
)
def check(pos):
col = pos % 3
row = pos - col
# vertical
if board[col] == board[col + 3] == board[col + 6]:
return board[col]
# horizontal
elif board[row] == board[row + 1] == board[row + 1]:
return board[row]
# diagonal
elif pos & 1:
if board[1] == board[5] == board[9]:
return board[5]
elif board[3] == board[5] == board[7]:
return board[5]
else:
return 0
else:
return 0
def print_board():
b = "".join([" " * 3] * 3)
hdiv = "".join(["" * 3] * 3)
def f(index):
return [index, "X", "O"][board[index]]
for x in range(3):
print(b) or 1
print("".join([f"{f(x * 3):^3}", f"{f(x * 3 + 1):^3}", f"{f(x * 3 + 2):^3}"])) or 1
print(b) or 1
if x < 2:
print(hdiv) or 1
def main():
c = 0
p = 1
winner = 0
while c < 9 and not winner:
print_board()
inp = input(f"[Player {p}] Position: ")
def test():
nonlocal winner, c, p
choice = int(inp)
if 0 <= choice <= 8:
if board[choice]:
print("Spot already taken!")
else:
board[choice] = p
winner = check(choice)
c += 1
p = 3 - p
else:
print("Invalid answer!") or 1
try_(test, d=lambda: print("Invalid answer!") or 1, e=ValueError)
print_board()
if winner == 0:
print("Draw!") or 1
else:
print(f"Winner: Player {winner}") or 1
if __name__ == '__main__':
print("Tic Tac Toe by Martmists") or 1
main()

3
obfuscator/obf/fibonacci.py

@ -1,2 +1,3 @@ @@ -1,2 +1,3 @@
# Generated by Mart Obfuscator
a=lambda b:b<(()<=())<<(()==())and[]>=[]or a(b-([]>=[]))+a(b-((()==())<<([]>=[])))
a=lambda b:b<([]==[])<<(()>=())and()==()or a(b-([]==[]))+a(b-((()>=())<<(()==())))

3
obfuscator/obf/hello_world.py

@ -1,2 +1,3 @@ @@ -1,2 +1,3 @@
# Generated by Mart Obfuscator
print('%c'*((()>=())<<((()==())<<([]>=[])|(()==()))|(()>=())<<(([]<=[])<<(()>=())))%((([]>=[])<<(([]<=[])<<((()<=())<<([]>=[])))+(([]<=[])<<(()>=())))+(([]>=[])<<((()>=())<<(()>=()))+(()>=())),(([]>=[])<<(([]==[])<<((()<=())<<(()>=())))+(([]==[])<<([]>=[])))+(([]>=[])<<(([]>=[])<<(([]<=[])<<([]>=[])))+([]>=[]))+(([]>=[])<<(([]>=[])<<([]<=[])))+([]>=[]),(([]==[])<<((()>=())<<(([]>=[])<<([]>=[])))+((()>=())<<([]<=[]))|([]==[])<<(([]==[])<<(([]>=[])<<([]>=[])))+([]==[]))+(([]==[])<<(([]<=[])<<(()<=()))+([]<=[]))+(([]==[])<<(([]>=[])<<(()==()))),(([]>=[])<<((()>=())<<(([]>=[])<<(()==()))|(()>=())<<(()<=())))+(([]>=[])<<((()==())<<(([]==[])<<([]>=[]))|(()==())))|([]>=[])<<(([]==[])<<(()>=())|([]==[]))|([]>=[])<<((()==())<<([]>=[])),(([]==[])<<((()<=())<<((()<=())<<(()==())))+((()<=())<<(()>=())))+(([]==[])<<((()>=())<<(([]>=[])<<([]>=[]))|(()>=())))+(([]==[])<<(([]>=[])<<([]==[])|([]>=[])))+(([]==[])<<((()>=())<<([]<=[])))|([]==[])<<(()==())|([]==[]),(()<=())<<(([]==[])<<(([]>=[])<<(()>=()))|([]==[])),(((()<=())<<(([]==[])<<(([]>=[])<<(()>=()))|([]==[])<<([]==[]))|(()<=())<<(([]==[])<<(([]<=[])<<(()>=()))|([]==[])))+((()<=())<<((()<=())<<(([]<=[])<<(()<=()))))|(()<=())<<(([]>=[])<<(()<=())))+((()<=())<<([]<=[]))+(()<=()),(([]>=[])<<(([]==[])<<((()<=())<<(()>=())))+(([]==[])<<([]==[]))|([]>=[])<<(([]>=[])<<((()>=())<<(()>=())))+([]>=[]))+(([]>=[])<<((()<=())<<(()<=())|(()<=())))|([]>=[])<<(([]>=[])<<(()>=()))|([]>=[])<<([]==[])|([]>=[]),(([]>=[])<<((()<=())<<(([]>=[])<<([]<=[]))|(()<=())<<([]>=[])))+(([]>=[])<<((()<=())<<((()==())<<(()<=()))|(()<=())))+(([]>=[])<<((()<=())<<(([]>=[])<<(()>=()))))|([]>=[])<<(()==()),((()==())<<((()<=())<<(([]<=[])<<([]>=[])))+((()<=())<<([]>=[]))|(()==())<<((()<=())<<((()>=())<<([]==[])))+(()<=()))+((()==())<<(([]>=[])<<([]==[])|([]>=[])))+((()==())<<(([]<=[])<<(()>=()))),(([]<=[])<<((()==())<<((()<=())<<([]>=[]))|(()==())<<(()==())))+(([]<=[])<<(([]<=[])<<((()<=())<<(()<=()))|([]<=[])))+(([]<=[])<<((()<=())<<([]<=[]))),([]>=[])<<(([]<=[])<<((()<=())<<(()>=())))+([]<=[])|([]>=[])))
print('%c'*(([]>=[])<<((()>=())<<([]==[]))+(()>=())|([]>=[])<<(([]==[])<<([]==[])))%((()==())<<(([]<=[])<<((()==())<<(()==()))|([]<=[])<<(()>=()))|(()==())<<((()>=())<<(()==()))+(()>=()),(([]<=[])<<(([]==[])<<(([]==[])<<(()<=())))+(([]==[])<<(()<=()))|([]<=[])<<(([]==[])<<((()==())<<([]<=[])))+([]==[]))+(([]<=[])<<(([]<=[])<<(()==())))|([]<=[]),((([]==[])<<((()<=())<<((()==())<<(()>=())))+((()<=())<<([]>=[])))+(([]==[])<<(([]>=[])<<(([]<=[])<<(()<=())))+([]>=[]))|([]==[])<<(([]<=[])<<(()==()))+([]<=[]))+(([]==[])<<(([]<=[])<<([]==[]))),(([]==[])<<(([]<=[])<<((()==())<<([]==[]))|([]<=[])<<([]==[]))|([]==[])<<(([]==[])<<((()<=())<<(()==())))+([]==[]))+(([]==[])<<(([]<=[])<<([]<=[]))+([]<=[]))+(([]==[])<<(([]>=[])<<(()<=()))),((()>=())<<((()>=())<<(([]<=[])<<(()==())))+((()>=())<<([]==[])))+((()>=())<<((()>=())<<(([]<=[])<<(()<=()))|(()>=())))+((()>=())<<((()==())<<([]<=[])|(()==())))+((()>=())<<(([]==[])<<(()<=())))+((()>=())<<([]==[]))|(()>=()),([]<=[])<<(([]==[])<<((()>=())<<([]==[])))+([]==[]),(((()<=())<<((()<=())<<(([]==[])<<([]==[])))+((()<=())<<(()<=()))|(()<=())<<((()==())<<((()<=())<<(()==())))+(()==()))+((()<=())<<(([]>=[])<<(([]>=[])<<(()<=()))))+((()<=())<<((()<=())<<(()>=())))|(()<=())<<([]<=[]))+(()<=()),((()<=())<<(([]<=[])<<((()==())<<([]>=[]))|([]<=[])<<(()>=()))|(()<=())<<((()<=())<<(([]<=[])<<(()>=()))|(()<=()))|(()<=())<<((()==())<<(()>=()))+(()==())|(()<=())<<(([]>=[])<<([]<=[])))+((()<=())<<([]==[]))+(()<=()),((()==())<<(([]<=[])<<(([]==[])<<([]>=[])))+(([]<=[])<<([]>=[]))|(()==())<<((()>=())<<(([]==[])<<([]>=[]))|(()>=())))+((()==())<<((()>=())<<(([]>=[])<<([]>=[]))))+((()==())<<([]==[])),((()<=())<<(([]==[])<<(([]>=[])<<([]>=[])))+(([]==[])<<(()>=()))|(()<=())<<(([]==[])<<(([]==[])<<(()==())))+([]==[]))+((()<=())<<((()>=())<<([]>=[]))+(()>=()))|(()<=())<<((()>=())<<(()<=())),((()==())<<(([]<=[])<<(([]>=[])<<([]==[])))+(([]<=[])<<([]<=[])))+((()==())<<(([]<=[])<<(([]==[])<<(()==()))|([]<=[])))+((()==())<<((()==())<<([]>=[]))),(([]<=[])<<(([]<=[])<<(([]>=[])<<([]>=[]))|([]<=[])))+([]<=[])))

3
obfuscator/obf/inner_func.py

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
# Generated by Mart Obfuscator
a=[0]
b=lambda c:(a.__setitem__(()is[],lambda:print(f'Hello, {c}')),a[()is()])[-([]<=[])]
b=lambda c:(a.__setitem__([]is(),lambda:print(f'Hello, {c}')),a[()is()])[-([]<=[])]

11
obfuscator/obf/sort.py

@ -1,5 +1,8 @@ @@ -1,5 +1,8 @@
# Generated by Mart Obfuscator
a=[0]*((([]>=[])<<((()<=())<<(()<=())|(()<=())))+([]>=[]))
selection_sort=lambda b:[(a.__setitem__([]>[],c),[b[a[[]<[]]]>b[d]and a.__setitem__(()>(),d)for d in range(c+(()<=()),len(b))],a.__setitem__(()==(),b[c]),b.__setitem__(c,b[a[()>()]]),b.__setitem__(a[[]>[]],a[()==()]))for c in range(len(b))]
quick_sort=lambda e,f,g:f>=g and-([]<=[])or(a.__setitem__(([]<=[])<<([]>=[]),h(e,f,g)),quick_sort(e,f,a[([]>=[])<<([]==[])]-([]<=[])),quick_sort(e,a[(()>=())<<(()<=())]+([]<=[]),g))
h=lambda e,f,g:(a.__setitem__(([]<=[])<<(()>=())|([]<=[]),e[f]),a.__setitem__((()==())<<((()<=())<<(()<=())),f+([]>=[])),a.__setitem__((([]>=[])<<(([]>=[])<<([]<=[])))+([]>=[]),g),a.__setitem__(((()==())<<((()==())<<([]>=[])))+((()==())<<(()==())),lambda:a[(()<=())<<(([]==[])<<([]==[]))]<=a[([]>=[])<<((()==())<<(()==()))|([]>=[])]and((a.__setitem__((([]>=[])<<(([]<=[])<<([]>=[])))+(([]>=[])<<([]<=[]))|([]>=[]),lambda:(a[(()<=())<<((()<=())<<(()>=()))]<=a[(()==())<<(([]<=[])<<(()==()))|(()==())]and e[a[((()==())<<((()==())<<([]==[])))+(()==())]]>=a[([]<=[])<<(()==())|([]<=[])])and(a.__setitem__(((()==())<<(([]<=[])<<(()<=())))+(()==()),a[([]>=[])<<((()>=())<<([]<=[]))|([]>=[])]-(()>=())),a[(()<=())<<((()<=())<<(()==()))|(()<=())<<(()>=())|(()<=())]())),a[(()==())<<(([]<=[])<<([]<=[]))|(()==())<<([]==[])|(()==())](),a.__setitem__(([]<=[])<<(([]==[])<<([]<=[])|([]==[])),lambda:(a[([]==[])<<((()==())<<(()==()))]<=a[(([]>=[])<<((()==())<<([]<=[])))+([]>=[])]and e[a[(()>=())<<((()==())<<(()>=()))]]<=a[([]==[])<<(()<=())|([]==[])])and(a.__setitem__(([]<=[])<<(([]<=[])<<(()==())),a[([]==[])<<((()>=())<<(()==()))]+(()==())),a[(()>=())<<((()<=())<<([]==[])|(()<=()))]())),a[(()==())<<(([]>=[])<<([]>=[])|([]>=[]))](),a[([]<=[])<<(([]>=[])<<(()<=()))]<=a[(([]>=[])<<((()==())<<(()>=())))+([]>=[])]and(a.__setitem__(()>=(),e[a[([]==[])<<((()>=())<<(()==()))]]),e.__setitem__(a[(()==())<<(([]<=[])<<([]>=[]))],e[a[((()<=())<<(([]>=[])<<(()>=())))+(()<=())]]),e.__setitem__(a[(()>=())<<(([]==[])<<(()>=()))|(()>=())],a[()==()]))),a[((()>=())<<((()==())<<([]<=[])))+((()>=())<<([]==[]))]())),a[([]==[])<<(([]==[])<<(()==()))|([]==[])<<(()>=())](),a.__setitem__(()>=(),e[f]),e.__setitem__(f,e[a[([]>=[])<<(([]>=[])<<([]>=[]))|([]>=[])]]),e.__setitem__(a[([]==[])<<(([]==[])<<([]<=[]))|([]==[])],a[()==()]),a[([]>=[])<<(([]<=[])<<(()<=()))|([]>=[])])[-([]>=[])]
a=[0]*((()>=())<<(([]>=[])<<(()<=())|([]>=[]))|(()>=()))
selection_sort=lambda b:[(a.__setitem__([]is[],c),[b[a[()is[]]]>b[d]and a.__setitem__([]>[],d)for d in range(c+(()==()),len(b))],a.__setitem__([]<=[],b[c]),b.__setitem__(c,b[a[()is()]]),b.__setitem__(a[[]>[]],a[[]>=[]]))for c in range(len(b))]
quick_sort=lambda e,f,g:f>=g and-(()==())or(a.__setitem__((()<=())<<(()==()),h(e,f,g)),quick_sort(e,f,a[(()>=())<<([]<=[])]-([]<=[])),quick_sort(e,a[(()==())<<([]<=[])]+(()<=()),g))
h=lambda e,f,g:(a.__setitem__((([]<=[])<<(()<=()))+([]<=[]),e[f]),a.__setitem__((()<=())<<((()>=())<<(()<=())),f+([]==[])),a.__setitem__((([]>=[])<<((()==())<<([]>=[])))+([]>=[]),g),a.__setitem__(([]<=[])<<((()==())<<([]>=[]))|([]<=[])<<(()>=()),lambda:a[(()>=())<<(([]>=[])<<(()==()))]<=a[([]>=[])<<((()==())<<(()<=()))|([]>=[])]and((a.__setitem__((([]>=[])<<((()>=())<<([]>=[])))+(([]>=[])<<(()==()))|([]>=[]),lambda:(a[(()>=())<<((()<=())<<(()<=()))]<=a[(([]==[])<<((()==())<<([]<=[])))+([]==[])]and e[a[(()==())<<(([]>=[])<<(()<=()))|(()==())]]>=a[((()<=())<<([]<=[]))+(()<=())])and(a.__setitem__((()==())<<(([]>=[])<<(()>=()))|(()==()),a[(()<=())<<(([]==[])<<([]==[]))|(()<=())]-([]>=[])),a[((()<=())<<(([]<=[])<<([]==[])))+((()<=())<<(()<=()))+(()<=())]())),a[(([]>=[])<<(([]<=[])<<(()==()))|([]>=[])<<([]==[]))+([]>=[])](),a.__setitem__(([]<=[])<<(([]>=[])<<(()<=()))+([]>=[]),lambda:(a[([]<=[])<<(([]==[])<<([]==[]))]<=a[([]<=[])<<(([]<=[])<<([]==[]))|([]<=[])]and e[a[([]==[])<<(([]==[])<<([]==[]))]]<=a[((()<=())<<([]==[]))+(()<=())])and(a.__setitem__((()>=())<<(([]>=[])<<(()<=())),a[([]<=[])<<(([]>=[])<<(()==()))]+(()>=())),a[(()==())<<(([]>=[])<<(()==()))+([]>=[])]())),a[([]<=[])<<(([]<=[])<<([]<=[])|([]<=[]))](),a[(()==())<<((()==())<<([]>=[]))]<=a[(()<=())<<(([]>=[])<<([]==[]))|(()<=())]and(a.__setitem__([]>=[],e[a[([]>=[])<<((()==())<<([]==[]))]]),e.__setitem__(a[(()<=())<<((()>=())<<(()<=()))],e[a[(([]==[])<<(([]<=[])<<([]<=[])))+([]==[])]]),e.__setitem__(a[((()==())<<((()>=())<<([]>=[])))+(()==())],a[[]>=[]]))),a[([]<=[])<<(([]<=[])<<([]==[]))|([]<=[])<<(()==())]())),a[((()<=())<<((()>=())<<(()<=())))+((()<=())<<(()>=()))](),a.__setitem__([]<=[],e[f]),e.__setitem__(f,e[a[([]<=[])<<((()==())<<([]<=[]))|([]<=[])]]),e.__setitem__(a[(()<=())<<(([]<=[])<<([]>=[]))|(()<=())],a[()<=()]),a[((()==())<<((()==())<<([]==[])))+(()==())])[-(()>=())]

14
obfuscator/obf/tictactoe.py

File diff suppressed because one or more lines are too long

1
obfuscator/obfuscator.py

@ -18,6 +18,7 @@ def sub_unquoted(pattern, replace, text): @@ -18,6 +18,7 @@ def sub_unquoted(pattern, replace, text):
def trim_whitespace(source: str) -> str:
# TODO: These affect strings too
source = sub_unquoted(r"(and|or|for|in|is|not) \n", r"\1 ", source)
source = sub_unquoted(r"([0-9\[\]\(\)\{\}'\"]) (and|or|for|in|is|not)", r"\1\2", source)
source = sub_unquoted(r"(and|or|for|in|is|not) ([\[\]\(\)\{\}'\"-])", r"\1\2", source)
source = sub_unquoted(r"\s*(>=|!=|<=|>|<|=|-|\+|\*|:|,|%|/|\||@|\^|&)\s*", r"\1", source)

11
obfuscator/visitors/statement_transformer.py

@ -235,8 +235,17 @@ class StatementTransformer(AstTransformer): @@ -235,8 +235,17 @@ class StatementTransformer(AstTransformer):
)
return changed
def visit_AugAssign(self, node: AugAssign) -> Any:
return self.visit(Assign(
targets=[node.target],
value=BinOp(left=node.target, op=node.op, right=node.value)
))
def visit_Nonlocal(self, node: Nonlocal) -> Any:
return Constant(value=-1, kind=Load())
def visit_Module(self, node: Module) -> Any:
body = [v
body = [Expr(v)
for z in [x if isinstance(x, list) else [x]
for x in map(self.visit, node.body)]
for v in z]

5
py_ctypes/__init__.py

@ -1,11 +1,12 @@ @@ -1,11 +1,12 @@
from ctype import CType, cprop
from ctype_view import get_view
from ctype_view import get_view, cast
from c_types.basic_types import *
from cpy_types.basic_types import *
from cpy_types.collection_types import *
from cpy_types.common_types import *
from cpy_types.meta_types import *
__all__ = ("CType", "cprop", "get_view",
__all__ = ("CType", "cprop", "get_view", "cast",
"CChar", "CShort", "CInt", "CLong", "CLongLong",
"CPyObject", "CPyVarObject", "CPyTypeObject")

150
py_ctypes/cpy_types/common_types.py

@ -0,0 +1,150 @@ @@ -0,0 +1,150 @@
from cpy_types.meta_types import CPyObject, CPyVarObject
from ctype import cprop, CType
class CPyCodeObject(CPyObject):
def __init__(self):
super().__init__()
self.co_argcount = cprop("co_argcount", "i")
self.co_posonlyargcount = cprop("co_posonlyargcount", "i")
self.co_kwonlyargcount = cprop("co_kwonlyargcount", "i")
self.co_nlocals = cprop("co_nlocals", "i")
self.co_stacksize = cprop("co_stacksize", "i")
self.co_flags = cprop("co_flags", "i")
self.co_firstlineno = cprop("co_firstlineno", "i")
self.co_code = cprop("co_code", "P[CPyBytesObject]")
self.co_consts = cprop("co_consts", "P[CPyTupleObject]")
self.co_names = cprop("co_names", "P[CPyTupleObject]")
self.co_varnames = cprop("co_varnames", "P[CPyTupleObject]")
self.co_freevars = cprop("co_freevars", "P[CPyTupleObject]")
self.co_cellvars = cprop("co_cellvars", "P[CPyTupleObject]")
self.co_cell2arg = cprop("co_cell2arg", "P[void]")
self.co_filename = cprop("co_filename", "P[CPyASCIIObject]")
self.co_name = cprop("co_name", "P[CPyASCIIObject]")
self.co_linetable = cprop("co_linetable", "P[CPyObject]")
self.co_zombieframe = cprop("co_zombieframe", "P[void]")
self.co_weakreflist = cprop("co_weakreflist", "P[CPyObject]")
self.co_extra = cprop("co_extra", "P[void]")
self.co_opcache_map = cprop("co_opcache_map", "P[CChar]")
self.co_opcache = cprop("co_opcache", "P[void]") # TODO
self.co_opcache_flag = cprop("co_opcache_flag", "i")
self.co_opcache_size = cprop("co_opcache_size", "c")
self.add_props(
self.co_argcount,
self.co_posonlyargcount,
self.co_kwonlyargcount,
self.co_nlocals,
self.co_stacksize,
self.co_flags,
self.co_firstlineno,
self.co_code,
self.co_consts,
self.co_names,
self.co_varnames,
self.co_freevars,
self.co_cellvars,
self.co_cell2arg,
self.co_filename,
self.co_name,
self.co_linetable,
self.co_zombieframe,
self.co_weakreflist,
self.co_extra,
self.co_opcache_map,
self.co_opcache,
self.co_opcache_flag,
self.co_opcache_size,
)
@classmethod
def get(cls, obj):
raise NotImplementedError
class CPyTryBlock(CType):
def __init__(self):
super().__init__()
self.b_type = cprop("b_type", "i")
self.b_handler = cprop("b_handler", "i")
self.b_level = cprop("b_level", "i")
self.add_props(
self.b_type,
self.b_handler,
self.b_level,
)
@classmethod
def get(cls, obj):
pass
class CPyFrameObject(CPyVarObject):
def __init__(self):
super().__init__()
self.f_back = cprop("f_back", "P[CPyFrameObject]")
self.f_code = cprop("f_code", "P[CPyCodeObject]")
self.f_builtins = cprop("f_builtins", "P[CPyDictObject]")
self.f_globals = cprop("f_globals", "P[CPyDictObject]")
self.f_locals = cprop("f_locals", "P[CPyObject]")
self.f_valuestack = cprop("f_valuestack", "P[void]")
self.f_trace = cprop("f_trace", "P[CPyObject]")
self.f_stackdepth = cprop("f_stackdepth", "i")
self.f_trace_lines = cprop("f_trace_lines", "c")
self.f_trace_opcodes = cprop("f_trace_opcodes", "c")
self.f_gen = cprop("f_gen", "P[CPyObject]")
self.f_lasti = cprop("f_lasti", "i")
self.f_lineno = cprop("f_lineno", "i")
self.f_iblock = cprop("f_iblock", "i")
self.f_state = cprop("f_state", "c")
self.f_blockstack = cprop("f_blockstack", "A[CPyTryBlock]20")
self.f_localsplus = cprop("f_localsplus", "A[void]S")
self.add_props(
self.f_back,
self.f_code,
self.f_builtins,
self.f_globals,
self.f_locals,
self.f_valuestack,
self.f_trace,
self.f_stackdepth,
self.f_trace_lines,
self.f_trace_opcodes,
self.f_gen,
self.f_lasti,
self.f_lineno,
self.f_iblock,
self.f_state,
self.f_blockstack,
self.f_localsplus,
)
@classmethod
def get(cls, obj):
raise NotImplementedError
class CPyTracebackObject(CPyObject):
def __init__(self):
super().__init__()
self.tb_next = cprop("tb_next", "P[CPyTracebackObject]")
self.tb_frame = cprop("tb_frame", "P[CPyFrameObject]")
self.tb_lasti = cprop("tb_lasti", "i")
self.tb_lineno = cprop("tb_lineno", "i")
self.add_props(
self.tb_next,
self.tb_frame,
self.tb_lasti,
self.tb_lineno,
)
@classmethod
def get(cls, obj):
raise NotImplementedError

2
py_ctypes/cpy_types/meta_types.py

@ -48,7 +48,7 @@ class CPyTypeObject(CPyVarObject): @@ -48,7 +48,7 @@ class CPyTypeObject(CPyVarObject):
def __init__(self):
super().__init__()
self.tp_name = cprop("tp_name", "P[CChar]")
self.tp_name = cprop("tp_name", "R[CChar]0")
self.tp_basicsize = cprop("tp_basicsize", "n")
self.tp_itemsize = cprop("tp_itemsize", "n")
self.tp_dealloc = cprop("tp_dealloc", "P[void]")

31
py_ctypes/ctype_view.py

@ -5,12 +5,18 @@ from typing import List, Any, TypeVar, Union @@ -5,12 +5,18 @@ from typing import List, Any, TypeVar, Union
from ctype import CType, _TYPES
from util import property
__all__ = ('get_view',)
__all__ = ('get_view', 'cast')
_MEMCACHE = {}
_TYPE_ARG = TypeVar('T', CType, CType) # Hacky mess to create <T extends CType>
def cast(obj_or_addr: Union[int, CType], type_: _TYPE_ARG) -> _TYPE_ARG:
addr = obj_or_addr if isinstance(obj_or_addr, int) else obj_or_addr._addr
del _MEMCACHE[addr]
return get_view(addr, type_)
def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG:
existing = _MEMCACHE.get(addr)
if existing:
@ -31,6 +37,7 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG: @@ -31,6 +37,7 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG:
_dict['_addr'] = addr
_dict['get_size'] = type_.__class__.get_size
_dict['_props'] = type_._props
@contextmanager
def _at(offset: int):
@ -61,9 +68,10 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG: @@ -61,9 +68,10 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG:
with _at(offset) as mem:
mem.write(data)
else:
subtype_ = prop._ctype[2:-2]
index = prop._ctype.find("]")
subtype_ = prop._ctype[2:index]
type_obj = _TYPES[subtype_] if subtype_ != "void" else None
end_marker = prop._ctype[-1]
end_marker = prop._ctype[index+1:]
if prop._ctype[0] == "R":
# Array Pointer
@ -98,7 +106,6 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG: @@ -98,7 +106,6 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG:
else:
# Array
# TODO: set Array size property
def fget(obj):
items = []
if end_marker == "0":
@ -116,6 +123,10 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG: @@ -116,6 +123,10 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG:
for i in range(obj.ob_size):
items.append(get_view(addr + offset + i * type_obj.get_size(), type_obj))
elif end_marker.isnumeric():
for i in range(int(end_marker)):
items.append(get_view(addr + offset + i * type_obj.get_size(), type_obj))
return items
def fset(obj, new_obj: List[CType]):
@ -141,7 +152,17 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG: @@ -141,7 +152,17 @@ def get_view(addr: int, type_: _TYPE_ARG) -> _TYPE_ARG:
type_str = ""
for i, prop in enumerate(_props):
stype = prop._ctype
if len(prop._ctype) > 1:
if prop._ctype[0] == "A":
index = prop._ctype.find("]")
subtype_ = prop._ctype[2:index]
end_marker = prop._ctype[index+1:]
if end_marker.isnumeric() and end_marker != "0":
stype = ''.join(p._ctype for c in _TYPES[subtype_]._props) * _TYPES[subtype_].get_size()
print(stype)
else:
# Assume at end of type
stype = "c"
elif len(prop._ctype) > 1:
stype = "P"
configure_prop(prop, offset)
pre_offset = calcsize(type_str+stype) - calcsize(stype) # hacky mess due to alignment

6
py_ctypes/test.py

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
from c_types.basic_types import *
from cpy_types.basic_types import *
from cpy_types.collection_types import *
from cpy_types.common_types import *
from cpy_types.meta_types import *
from ctype_view import get_view
@ -9,11 +10,16 @@ def main(): @@ -9,11 +10,16 @@ def main():
x = "totally a string"
y = 10
z = [10]
f = __import__('sys')._getframe(1)
x_as_pyascii = get_view(id(x), CPyASCIIObject.instance()) # This sets the offsets properly on PyASCIIObject
y_as_pylong = get_view(id(y), CPyLongObject.instance())
z_as_pylist = get_view(id(z), CPyListObject.instance())
f_as_pyframe = get_view(id(f), CPyFrameObject.instance())
print(f_as_pyframe.f_)
# print(''.join(CChar.get(c).decode() for c in f_as_pyfile.dict.ob_type))
print(CPyObject.by_reference(z_as_pylist) is z) # We get the exact same object back
offset = CPyASCIIObject.instance().wstr.offset_after # offset of content of PyASCIIObject

9
voice_tools/main.py

@ -1,9 +0,0 @@ @@ -1,9 +0,0 @@
import sounddevice
stream = sounddevice.RawStream(samplerate=48000, channels=2, latency=["low", "low"])
if __name__ == "__main__":
stream.start()
# TODO
stream.stop()
stream.close()

1
voice_tools/requirements.txt

@ -1 +0,0 @@ @@ -1 +0,0 @@
sounddevice
Loading…
Cancel
Save