mirror of
https://github.com/kforney/pentest-distro-builder.git
synced 2024-11-29 03:31:59 -07:00
651 lines
25 KiB
Python
651 lines
25 KiB
Python
import os
|
|
import io
|
|
import re
|
|
import sys
|
|
import json
|
|
import traceback
|
|
import platform
|
|
|
|
jediPreview = False
|
|
|
|
class RedirectStdout(object):
|
|
def __init__(self, new_stdout=None):
|
|
"""If stdout is None, redirect to /dev/null"""
|
|
self._new_stdout = new_stdout or open(os.devnull, 'w')
|
|
|
|
def __enter__(self):
|
|
sys.stdout.flush()
|
|
self.oldstdout_fno = os.dup(sys.stdout.fileno())
|
|
os.dup2(self._new_stdout.fileno(), 1)
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self._new_stdout.flush()
|
|
os.dup2(self.oldstdout_fno, 1)
|
|
os.close(self.oldstdout_fno)
|
|
|
|
class JediCompletion(object):
|
|
basic_types = {
|
|
'module': 'import',
|
|
'instance': 'variable',
|
|
'statement': 'value',
|
|
'param': 'variable',
|
|
}
|
|
|
|
def __init__(self):
|
|
self.default_sys_path = sys.path
|
|
self.environment = jedi.api.environment.Environment(sys.prefix, sys.executable)
|
|
self._input = io.open(sys.stdin.fileno(), encoding='utf-8')
|
|
if (os.path.sep == '/') and (platform.uname()[2].find('Microsoft') > -1):
|
|
# WSL; does not support UNC paths
|
|
self.drive_mount = '/mnt/'
|
|
elif sys.platform == 'cygwin':
|
|
# cygwin
|
|
self.drive_mount = '/cygdrive/'
|
|
else:
|
|
# Do no normalization, e.g. Windows build of Python.
|
|
# Could add additional test: ((os.path.sep == '/') and os.path.isdir('/mnt/c'))
|
|
# However, this may have more false positives trying to identify Windows/*nix hybrids
|
|
self.drive_mount = ''
|
|
|
|
def _get_definition_type(self, definition):
|
|
# if definition.type not in ['import', 'keyword'] and is_built_in():
|
|
# return 'builtin'
|
|
try:
|
|
if definition.type in ['statement'] and definition.name.isupper():
|
|
return 'constant'
|
|
return self.basic_types.get(definition.type, definition.type)
|
|
except Exception:
|
|
return 'builtin'
|
|
|
|
def _additional_info(self, completion):
|
|
"""Provide additional information about the completion object."""
|
|
if not hasattr(completion, '_definition') or completion._definition is None:
|
|
return ''
|
|
if completion.type == 'statement':
|
|
nodes_to_display = ['InstanceElement', 'String', 'Node', 'Lambda',
|
|
'Number']
|
|
return ''.join(c.get_code() for c in
|
|
completion._definition.children if type(c).__name__
|
|
in nodes_to_display).replace('\n', '')
|
|
return ''
|
|
|
|
@classmethod
|
|
def _get_top_level_module(cls, path):
|
|
"""Recursively walk through directories looking for top level module.
|
|
|
|
Jedi will use current filepath to look for another modules at same
|
|
path, but it will not be able to see modules **above**, so our goal
|
|
is to find the higher python module available from filepath.
|
|
"""
|
|
_path, _ = os.path.split(path)
|
|
if os.path.isfile(os.path.join(_path, '__init__.py')):
|
|
return cls._get_top_level_module(_path)
|
|
return path
|
|
|
|
def _generate_signature(self, completion):
|
|
"""Generate signature with function arguments.
|
|
"""
|
|
if completion.type in ['module'] or not hasattr(completion, 'params'):
|
|
return ''
|
|
return '%s(%s)' % (
|
|
completion.name,
|
|
', '.join(p.description[6:] for p in completion.params if p))
|
|
|
|
def _get_call_signatures(self, script):
|
|
"""Extract call signatures from jedi.api.Script object in failsafe way.
|
|
|
|
Returns:
|
|
Tuple with original signature object, name and value.
|
|
"""
|
|
_signatures = []
|
|
try:
|
|
call_signatures = script.call_signatures()
|
|
except KeyError:
|
|
call_signatures = []
|
|
except :
|
|
call_signatures = []
|
|
for signature in call_signatures:
|
|
for pos, param in enumerate(signature.params):
|
|
if not param.name:
|
|
continue
|
|
|
|
name = self._get_param_name(param)
|
|
if param.name == 'self' and pos == 0:
|
|
continue
|
|
if name.startswith('*'):
|
|
continue
|
|
|
|
value = self._get_param_value(param)
|
|
_signatures.append((signature, name, value))
|
|
return _signatures
|
|
|
|
def _get_param_name(self, p):
|
|
if(p.name.startswith('param ')):
|
|
return p.name[6:] # drop leading 'param '
|
|
return p.name
|
|
|
|
def _get_param_value(self, p):
|
|
pair = p.description.split('=')
|
|
if(len(pair) > 1):
|
|
return pair[1]
|
|
return None
|
|
|
|
def _get_call_signatures_with_args(self, script):
|
|
"""Extract call signatures from jedi.api.Script object in failsafe way.
|
|
|
|
Returns:
|
|
Array with dictionary
|
|
"""
|
|
_signatures = []
|
|
try:
|
|
call_signatures = script.call_signatures()
|
|
except KeyError:
|
|
call_signatures = []
|
|
for signature in call_signatures:
|
|
sig = {"name": "", "description": "", "docstring": "",
|
|
"paramindex": 0, "params": [], "bracketstart": []}
|
|
sig["description"] = signature.description
|
|
try:
|
|
sig["docstring"] = signature.docstring()
|
|
sig["raw_docstring"] = signature.docstring(raw=True)
|
|
except Exception:
|
|
sig["docstring"] = ''
|
|
sig["raw_docstring"] = ''
|
|
|
|
sig["name"] = signature.name
|
|
sig["paramindex"] = signature.index
|
|
sig["bracketstart"].append(signature.index)
|
|
|
|
_signatures.append(sig)
|
|
for pos, param in enumerate(signature.params):
|
|
if not param.name:
|
|
continue
|
|
|
|
name = self._get_param_name(param)
|
|
if param.name == 'self' and pos == 0:
|
|
continue
|
|
|
|
value = self._get_param_value(param)
|
|
paramDocstring = ''
|
|
try:
|
|
paramDocstring = param.docstring()
|
|
except Exception:
|
|
paramDocstring = ''
|
|
|
|
sig["params"].append({"name": name, "value": value, "docstring": paramDocstring, "description": param.description})
|
|
return _signatures
|
|
|
|
def _serialize_completions(self, script, identifier=None, prefix=''):
|
|
"""Serialize response to be read from VSCode.
|
|
|
|
Args:
|
|
script: Instance of jedi.api.Script object.
|
|
identifier: Unique completion identifier to pass back to VSCode.
|
|
prefix: String with prefix to filter function arguments.
|
|
Used only when fuzzy matcher turned off.
|
|
|
|
Returns:
|
|
Serialized string to send to VSCode.
|
|
"""
|
|
_completions = []
|
|
|
|
for signature, name, value in self._get_call_signatures(script):
|
|
if not self.fuzzy_matcher and not name.lower().startswith(
|
|
prefix.lower()):
|
|
continue
|
|
_completion = {
|
|
'type': 'property',
|
|
'raw_type': '',
|
|
'rightLabel': self._additional_info(signature)
|
|
}
|
|
_completion['description'] = ''
|
|
_completion['raw_docstring'] = ''
|
|
|
|
# we pass 'text' here only for fuzzy matcher
|
|
if value:
|
|
_completion['snippet'] = '%s=${1:%s}$0' % (name, value)
|
|
_completion['text'] = '%s=' % (name)
|
|
else:
|
|
_completion['snippet'] = '%s=$1$0' % name
|
|
_completion['text'] = name
|
|
_completion['displayText'] = name
|
|
_completions.append(_completion)
|
|
|
|
try:
|
|
completions = script.completions()
|
|
except KeyError:
|
|
completions = []
|
|
except :
|
|
completions = []
|
|
for completion in completions:
|
|
try:
|
|
_completion = {
|
|
'text': completion.name,
|
|
'type': self._get_definition_type(completion),
|
|
'raw_type': completion.type,
|
|
'rightLabel': self._additional_info(completion)
|
|
}
|
|
except Exception:
|
|
continue
|
|
|
|
for c in _completions:
|
|
if c['text'] == _completion['text']:
|
|
c['type'] = _completion['type']
|
|
c['raw_type'] = _completion['raw_type']
|
|
|
|
if any([c['text'].split('=')[0] == _completion['text']
|
|
for c in _completions]):
|
|
# ignore function arguments we already have
|
|
continue
|
|
_completions.append(_completion)
|
|
return json.dumps({'id': identifier, 'results': _completions})
|
|
|
|
def _serialize_methods(self, script, identifier=None, prefix=''):
|
|
_methods = []
|
|
try:
|
|
completions = script.completions()
|
|
except KeyError:
|
|
return []
|
|
|
|
for completion in completions:
|
|
if completion.name == '__autocomplete_python':
|
|
instance = completion.parent().name
|
|
break
|
|
else:
|
|
instance = 'self.__class__'
|
|
|
|
for completion in completions:
|
|
params = []
|
|
if hasattr(completion, 'params'):
|
|
params = [p.description for p in completion.params if p]
|
|
if completion.parent().type == 'class':
|
|
_methods.append({
|
|
'parent': completion.parent().name,
|
|
'instance': instance,
|
|
'name': completion.name,
|
|
'params': params,
|
|
'moduleName': completion.module_name,
|
|
'fileName': completion.module_path,
|
|
'line': completion.line,
|
|
'column': completion.column,
|
|
})
|
|
return json.dumps({'id': identifier, 'results': _methods})
|
|
|
|
def _serialize_arguments(self, script, identifier=None):
|
|
"""Serialize response to be read from VSCode.
|
|
|
|
Args:
|
|
script: Instance of jedi.api.Script object.
|
|
identifier: Unique completion identifier to pass back to VSCode.
|
|
|
|
Returns:
|
|
Serialized string to send to VSCode.
|
|
"""
|
|
return json.dumps({"id": identifier, "results": self._get_call_signatures_with_args(script)})
|
|
|
|
def _top_definition(self, definition):
|
|
for d in definition.goto_assignments():
|
|
if d == definition:
|
|
continue
|
|
if d.type == 'import':
|
|
return self._top_definition(d)
|
|
else:
|
|
return d
|
|
return definition
|
|
|
|
def _extract_range_jedi_0_11_1(self, definition):
|
|
from parso.utils import split_lines
|
|
# get the scope range
|
|
try:
|
|
if definition.type in ['class', 'function']:
|
|
tree_name = definition._name.tree_name
|
|
scope = tree_name.get_definition()
|
|
start_line = scope.start_pos[0] - 1
|
|
start_column = scope.start_pos[1]
|
|
# get the lines
|
|
code = scope.get_code(include_prefix=False)
|
|
lines = split_lines(code)
|
|
# trim the lines
|
|
lines = '\n'.join(lines).rstrip().split('\n')
|
|
end_line = start_line + len(lines) - 1
|
|
end_column = len(lines[-1]) - 1
|
|
else:
|
|
symbol = definition._name.tree_name
|
|
start_line = symbol.start_pos[0] - 1
|
|
start_column = symbol.start_pos[1]
|
|
end_line = symbol.end_pos[0] - 1
|
|
end_column = symbol.end_pos[1]
|
|
return {
|
|
'start_line': start_line,
|
|
'start_column': start_column,
|
|
'end_line': end_line,
|
|
'end_column': end_column
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'start_line': definition.line - 1,
|
|
'start_column': definition.column,
|
|
'end_line': definition.line - 1,
|
|
'end_column': definition.column
|
|
}
|
|
|
|
def _extract_range(self, definition):
|
|
"""Provides the definition range of a given definition
|
|
|
|
For regular symbols it returns the start and end location of the
|
|
characters making up the symbol.
|
|
|
|
For scoped containers it will return the entire definition of the
|
|
scope.
|
|
|
|
The scope that jedi provides ends with the first character of the next
|
|
scope so it's not ideal. For vscode we need the scope to end with the
|
|
last character of actual code. That's why we extract the lines that
|
|
make up our scope and trim the trailing whitespace.
|
|
"""
|
|
return self._extract_range_jedi_0_11_1(definition)
|
|
|
|
def _get_definitionsx(self, definitions, identifier=None, ignoreNoModulePath=False):
|
|
"""Serialize response to be read from VSCode.
|
|
|
|
Args:
|
|
definitions: List of jedi.api.classes.Definition objects.
|
|
identifier: Unique completion identifier to pass back to VSCode.
|
|
|
|
Returns:
|
|
Serialized string to send to VSCode.
|
|
"""
|
|
_definitions = []
|
|
for definition in definitions:
|
|
try:
|
|
if definition.type == 'import':
|
|
definition = self._top_definition(definition)
|
|
definitionRange = {
|
|
'start_line': 0,
|
|
'start_column': 0,
|
|
'end_line': 0,
|
|
'end_column': 0
|
|
}
|
|
module_path = ''
|
|
if hasattr(definition, 'module_path') and definition.module_path:
|
|
module_path = definition.module_path
|
|
definitionRange = self._extract_range(definition)
|
|
else:
|
|
if not ignoreNoModulePath:
|
|
continue
|
|
try:
|
|
parent = definition.parent()
|
|
container = parent.name if parent.type != 'module' else ''
|
|
except Exception:
|
|
container = ''
|
|
|
|
try:
|
|
docstring = definition.docstring()
|
|
rawdocstring = definition.docstring(raw=True)
|
|
except Exception:
|
|
docstring = ''
|
|
rawdocstring = ''
|
|
_definition = {
|
|
'text': definition.name,
|
|
'type': self._get_definition_type(definition),
|
|
'raw_type': definition.type,
|
|
'fileName': module_path,
|
|
'container': container,
|
|
'range': definitionRange,
|
|
'description': definition.description,
|
|
'docstring': docstring,
|
|
'raw_docstring': rawdocstring,
|
|
'signature': self._generate_signature(definition)
|
|
}
|
|
_definitions.append(_definition)
|
|
except Exception as e:
|
|
pass
|
|
return _definitions
|
|
|
|
def _serialize_definitions(self, definitions, identifier=None):
|
|
"""Serialize response to be read from VSCode.
|
|
|
|
Args:
|
|
definitions: List of jedi.api.classes.Definition objects.
|
|
identifier: Unique completion identifier to pass back to VSCode.
|
|
|
|
Returns:
|
|
Serialized string to send to VSCode.
|
|
"""
|
|
_definitions = []
|
|
for definition in definitions:
|
|
try:
|
|
if definition.module_path:
|
|
if definition.type == 'import':
|
|
definition = self._top_definition(definition)
|
|
if not definition.module_path:
|
|
continue
|
|
try:
|
|
parent = definition.parent()
|
|
container = parent.name if parent.type != 'module' else ''
|
|
except Exception:
|
|
container = ''
|
|
|
|
try:
|
|
docstring = definition.docstring()
|
|
rawdocstring = definition.docstring(raw=True)
|
|
except Exception:
|
|
docstring = ''
|
|
rawdocstring = ''
|
|
_definition = {
|
|
'text': definition.name,
|
|
'type': self._get_definition_type(definition),
|
|
'raw_type': definition.type,
|
|
'fileName': definition.module_path,
|
|
'container': container,
|
|
'range': self._extract_range(definition),
|
|
'description': definition.description,
|
|
'docstring': docstring,
|
|
'raw_docstring': rawdocstring
|
|
}
|
|
_definitions.append(_definition)
|
|
except Exception as e:
|
|
pass
|
|
return json.dumps({'id': identifier, 'results': _definitions})
|
|
|
|
def _serialize_tooltip(self, definitions, identifier=None):
|
|
_definitions = []
|
|
for definition in definitions:
|
|
signature = definition.name
|
|
description = None
|
|
if definition.type in ['class', 'function']:
|
|
signature = self._generate_signature(definition)
|
|
try:
|
|
description = definition.docstring(raw=True).strip()
|
|
except Exception:
|
|
description = ''
|
|
if not description and not hasattr(definition, 'get_line_code'):
|
|
# jedi returns an empty string for compiled objects
|
|
description = definition.docstring().strip()
|
|
if definition.type == 'module':
|
|
signature = definition.full_name
|
|
try:
|
|
description = definition.docstring(raw=True).strip()
|
|
except Exception:
|
|
description = ''
|
|
if not description and hasattr(definition, 'get_line_code'):
|
|
# jedi returns an empty string for compiled objects
|
|
description = definition.docstring().strip()
|
|
_definition = {
|
|
'type': self._get_definition_type(definition),
|
|
'text': definition.name,
|
|
'description': description,
|
|
'docstring': description,
|
|
'signature': signature
|
|
}
|
|
_definitions.append(_definition)
|
|
return json.dumps({'id': identifier, 'results': _definitions})
|
|
|
|
def _serialize_usages(self, usages, identifier=None):
|
|
_usages = []
|
|
for usage in usages:
|
|
_usages.append({
|
|
'name': usage.name,
|
|
'moduleName': usage.module_name,
|
|
'fileName': usage.module_path,
|
|
'line': usage.line,
|
|
'column': usage.column,
|
|
})
|
|
return json.dumps({'id': identifier, 'results': _usages})
|
|
|
|
def _deserialize(self, request):
|
|
"""Deserialize request from VSCode.
|
|
|
|
Args:
|
|
request: String with raw request from VSCode.
|
|
|
|
Returns:
|
|
Python dictionary with request data.
|
|
"""
|
|
return json.loads(request)
|
|
|
|
def _set_request_config(self, config):
|
|
"""Sets config values for current request.
|
|
|
|
This includes sys.path modifications which is getting restored to
|
|
default value on each request so each project should be isolated
|
|
from each other.
|
|
|
|
Args:
|
|
config: Dictionary with config values.
|
|
"""
|
|
sys.path = self.default_sys_path
|
|
self.use_snippets = config.get('useSnippets')
|
|
self.show_doc_strings = config.get('showDescriptions', True)
|
|
self.fuzzy_matcher = config.get('fuzzyMatcher', False)
|
|
jedi.settings.case_insensitive_completion = config.get(
|
|
'caseInsensitiveCompletion', True)
|
|
for path in config.get('extraPaths', []):
|
|
if path and path not in sys.path:
|
|
sys.path.insert(0, path)
|
|
|
|
def _normalize_request_path(self, request):
|
|
"""Normalize any Windows paths received by a *nix build of
|
|
Python. Does not alter the reverse os.path.sep=='\\',
|
|
i.e. *nix paths received by a Windows build of Python.
|
|
"""
|
|
if 'path' in request:
|
|
if not self.drive_mount:
|
|
return
|
|
newPath = request['path'].replace('\\', '/')
|
|
if newPath[0:1] == '/':
|
|
# is absolute path with no drive letter
|
|
request['path'] = newPath
|
|
elif newPath[1:2] == ':':
|
|
# is path with drive letter, only absolute can be mapped
|
|
request['path'] = self.drive_mount + newPath[0:1].lower() + newPath[2:]
|
|
else:
|
|
# is relative path
|
|
request['path'] = newPath
|
|
|
|
def _process_request(self, request):
|
|
"""Accept serialized request from VSCode and write response.
|
|
"""
|
|
request = self._deserialize(request)
|
|
|
|
self._set_request_config(request.get('config', {}))
|
|
|
|
self._normalize_request_path(request)
|
|
path = self._get_top_level_module(request.get('path', ''))
|
|
if len(path) > 0 and path not in sys.path:
|
|
sys.path.insert(0, path)
|
|
lookup = request.get('lookup', 'completions')
|
|
|
|
if lookup == 'names':
|
|
return self._serialize_definitions(
|
|
jedi.api.names(
|
|
source=request.get('source', None),
|
|
path=request.get('path', ''),
|
|
all_scopes=True),
|
|
request['id'])
|
|
|
|
script = jedi.Script(
|
|
source=request.get('source', None), line=request['line'] + 1,
|
|
column=request['column'], path=request.get('path', ''),
|
|
sys_path=sys.path, environment=self.environment)
|
|
|
|
if lookup == 'definitions':
|
|
defs = self._get_definitionsx(script.goto_assignments(follow_imports=True), request['id'])
|
|
return json.dumps({'id': request['id'], 'results': defs})
|
|
if lookup == 'tooltip':
|
|
if jediPreview:
|
|
defs = []
|
|
try:
|
|
defs = self._get_definitionsx(script.goto_definitions(), request['id'], True)
|
|
except:
|
|
pass
|
|
try:
|
|
if len(defs) == 0:
|
|
defs = self._get_definitionsx(script.goto_assignments(), request['id'], True)
|
|
except:
|
|
pass
|
|
return json.dumps({'id': request['id'], 'results': defs})
|
|
else:
|
|
try:
|
|
return self._serialize_tooltip(script.goto_definitions(), request['id'])
|
|
except:
|
|
return json.dumps({'id': request['id'], 'results': []})
|
|
elif lookup == 'arguments':
|
|
return self._serialize_arguments(
|
|
script, request['id'])
|
|
elif lookup == 'usages':
|
|
return self._serialize_usages(
|
|
script.usages(), request['id'])
|
|
elif lookup == 'methods':
|
|
return self._serialize_methods(script, request['id'],
|
|
request.get('prefix', ''))
|
|
else:
|
|
return self._serialize_completions(script, request['id'],
|
|
request.get('prefix', ''))
|
|
|
|
def _write_response(self, response):
|
|
sys.stdout.write(response + '\n')
|
|
sys.stdout.flush()
|
|
|
|
def watch(self):
|
|
while True:
|
|
try:
|
|
rq = self._input.readline()
|
|
if len(rq) == 0:
|
|
# Reached EOF - indication our parent process is gone.
|
|
sys.stderr.write('Received EOF from the standard input,exiting' + '\n')
|
|
sys.stderr.flush()
|
|
return
|
|
with RedirectStdout():
|
|
response = self._process_request(rq)
|
|
self._write_response(response)
|
|
|
|
except Exception:
|
|
sys.stderr.write(traceback.format_exc() + '\n')
|
|
sys.stderr.flush()
|
|
|
|
if __name__ == '__main__':
|
|
cachePrefix = 'v'
|
|
modulesToLoad = ''
|
|
if len(sys.argv) > 2 and sys.argv[1] == 'custom':
|
|
jediPath = sys.argv[2]
|
|
jediPreview = True
|
|
cachePrefix = 'custom_v'
|
|
if len(sys.argv) > 3:
|
|
modulesToLoad = sys.argv[3]
|
|
else:
|
|
#release
|
|
jediPath = os.path.dirname(__file__)
|
|
if len(sys.argv) > 1:
|
|
modulesToLoad = sys.argv[1]
|
|
|
|
sys.path.insert(0, jediPath)
|
|
import jedi
|
|
if jediPreview:
|
|
jedi.settings.cache_directory = os.path.join(
|
|
jedi.settings.cache_directory, cachePrefix + jedi.__version__.replace('.', ''))
|
|
# remove jedi from path after we import it so it will not be completed
|
|
sys.path.pop(0)
|
|
if len(modulesToLoad) > 0:
|
|
jedi.preload_module(*modulesToLoad.split(','))
|
|
JediCompletion().watch()
|