123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- """ Meager code path measurement tool.
- Ned Batchelder
- http://nedbatchelder.com/blog/200803/python_code_complexity_microtool.html
- MIT License.
- """
- from __future__ import with_statement
- import optparse
- import sys
- import tokenize
- from collections import defaultdict
- try:
- import ast
- from ast import iter_child_nodes
- except ImportError: # Python 2.5
- from flake8.util import ast, iter_child_nodes
- __version__ = '0.6.1'
- class ASTVisitor(object):
- """Performs a depth-first walk of the AST."""
- def __init__(self):
- self.node = None
- self._cache = {}
- def default(self, node, *args):
- for child in iter_child_nodes(node):
- self.dispatch(child, *args)
- def dispatch(self, node, *args):
- self.node = node
- klass = node.__class__
- meth = self._cache.get(klass)
- if meth is None:
- className = klass.__name__
- meth = getattr(self.visitor, 'visit' + className, self.default)
- self._cache[klass] = meth
- return meth(node, *args)
- def preorder(self, tree, visitor, *args):
- """Do preorder walk of tree using visitor"""
- self.visitor = visitor
- visitor.visit = self.dispatch
- self.dispatch(tree, *args) # XXX *args make sense?
- class PathNode(object):
- def __init__(self, name, look="circle"):
- self.name = name
- self.look = look
- def to_dot(self):
- print('node [shape=%s,label="%s"] %d;' % (
- self.look, self.name, self.dot_id()))
- def dot_id(self):
- return id(self)
- class PathGraph(object):
- def __init__(self, name, entity, lineno, column=0):
- self.name = name
- self.entity = entity
- self.lineno = lineno
- self.column = column
- self.nodes = defaultdict(list)
- def connect(self, n1, n2):
- self.nodes[n1].append(n2)
- # Ensure that the destination node is always counted.
- self.nodes[n2] = []
- def to_dot(self):
- print('subgraph {')
- for node in self.nodes:
- node.to_dot()
- for node, nexts in self.nodes.items():
- for next in nexts:
- print('%s -- %s;' % (node.dot_id(), next.dot_id()))
- print('}')
- def complexity(self):
- """ Return the McCabe complexity for the graph.
- V-E+2
- """
- num_edges = sum([len(n) for n in self.nodes.values()])
- num_nodes = len(self.nodes)
- return num_edges - num_nodes + 2
- class PathGraphingAstVisitor(ASTVisitor):
- """ A visitor for a parsed Abstract Syntax Tree which finds executable
- statements.
- """
- def __init__(self):
- super(PathGraphingAstVisitor, self).__init__()
- self.classname = ""
- self.graphs = {}
- self.reset()
- def reset(self):
- self.graph = None
- self.tail = None
- def dispatch_list(self, node_list):
- for node in node_list:
- self.dispatch(node)
- def visitFunctionDef(self, node):
- if self.classname:
- entity = '%s%s' % (self.classname, node.name)
- else:
- entity = node.name
- name = '%d:%d: %r' % (node.lineno, node.col_offset, entity)
- if self.graph is not None:
- # closure
- pathnode = self.appendPathNode(name)
- self.tail = pathnode
- self.dispatch_list(node.body)
- bottom = PathNode("", look='point')
- self.graph.connect(self.tail, bottom)
- self.graph.connect(pathnode, bottom)
- self.tail = bottom
- else:
- self.graph = PathGraph(name, entity, node.lineno, node.col_offset)
- pathnode = PathNode(name)
- self.tail = pathnode
- self.dispatch_list(node.body)
- self.graphs["%s%s" % (self.classname, node.name)] = self.graph
- self.reset()
- visitAsyncFunctionDef = visitFunctionDef
- def visitClassDef(self, node):
- old_classname = self.classname
- self.classname += node.name + "."
- self.dispatch_list(node.body)
- self.classname = old_classname
- def appendPathNode(self, name):
- if not self.tail:
- return
- pathnode = PathNode(name)
- self.graph.connect(self.tail, pathnode)
- self.tail = pathnode
- return pathnode
- def visitSimpleStatement(self, node):
- if node.lineno is None:
- lineno = 0
- else:
- lineno = node.lineno
- name = "Stmt %d" % lineno
- self.appendPathNode(name)
- def default(self, node, *args):
- if isinstance(node, ast.stmt):
- self.visitSimpleStatement(node)
- else:
- super(PathGraphingAstVisitor, self).default(node, *args)
- def visitLoop(self, node):
- name = "Loop %d" % node.lineno
- self._subgraph(node, name)
- visitAsyncFor = visitFor = visitWhile = visitLoop
- def visitIf(self, node):
- name = "If %d" % node.lineno
- self._subgraph(node, name)
- def _subgraph(self, node, name, extra_blocks=()):
- """create the subgraphs representing any `if` and `for` statements"""
- if self.graph is None:
- # global loop
- self.graph = PathGraph(name, name, node.lineno, node.col_offset)
- pathnode = PathNode(name)
- self._subgraph_parse(node, pathnode, extra_blocks)
- self.graphs["%s%s" % (self.classname, name)] = self.graph
- self.reset()
- else:
- pathnode = self.appendPathNode(name)
- self._subgraph_parse(node, pathnode, extra_blocks)
- def _subgraph_parse(self, node, pathnode, extra_blocks):
- """parse the body and any `else` block of `if` and `for` statements"""
- loose_ends = []
- self.tail = pathnode
- self.dispatch_list(node.body)
- loose_ends.append(self.tail)
- for extra in extra_blocks:
- self.tail = pathnode
- self.dispatch_list(extra.body)
- loose_ends.append(self.tail)
- if node.orelse:
- self.tail = pathnode
- self.dispatch_list(node.orelse)
- loose_ends.append(self.tail)
- else:
- loose_ends.append(pathnode)
- if pathnode:
- bottom = PathNode("", look='point')
- for le in loose_ends:
- self.graph.connect(le, bottom)
- self.tail = bottom
- def visitTryExcept(self, node):
- name = "TryExcept %d" % node.lineno
- self._subgraph(node, name, extra_blocks=node.handlers)
- visitTry = visitTryExcept
- def visitWith(self, node):
- name = "With %d" % node.lineno
- self.appendPathNode(name)
- self.dispatch_list(node.body)
- visitAsyncWith = visitWith
- class McCabeChecker(object):
- """McCabe cyclomatic complexity checker."""
- name = 'mccabe'
- version = __version__
- _code = 'C901'
- _error_tmpl = "C901 %r is too complex (%d)"
- max_complexity = -1
- def __init__(self, tree, filename):
- self.tree = tree
- @classmethod
- def add_options(cls, parser):
- flag = '--max-complexity'
- kwargs = {
- 'default': -1,
- 'action': 'store',
- 'type': 'int',
- 'help': 'McCabe complexity threshold',
- 'parse_from_config': 'True',
- }
- config_opts = getattr(parser, 'config_options', None)
- if isinstance(config_opts, list):
- # Flake8 2.x
- kwargs.pop('parse_from_config')
- parser.add_option(flag, **kwargs)
- parser.config_options.append('max-complexity')
- else:
- parser.add_option(flag, **kwargs)
- @classmethod
- def parse_options(cls, options):
- cls.max_complexity = int(options.max_complexity)
- def run(self):
- if self.max_complexity < 0:
- return
- visitor = PathGraphingAstVisitor()
- visitor.preorder(self.tree, visitor)
- for graph in visitor.graphs.values():
- if graph.complexity() > self.max_complexity:
- text = self._error_tmpl % (graph.entity, graph.complexity())
- yield graph.lineno, graph.column, text, type(self)
- def get_code_complexity(code, threshold=7, filename='stdin'):
- try:
- tree = compile(code, filename, "exec", ast.PyCF_ONLY_AST)
- except SyntaxError:
- e = sys.exc_info()[1]
- sys.stderr.write("Unable to parse %s: %s\n" % (filename, e))
- return 0
- complx = []
- McCabeChecker.max_complexity = threshold
- for lineno, offset, text, check in McCabeChecker(tree, filename).run():
- complx.append('%s:%d:1: %s' % (filename, lineno, text))
- if len(complx) == 0:
- return 0
- print('\n'.join(complx))
- return len(complx)
- def get_module_complexity(module_path, threshold=7):
- """Returns the complexity of a module"""
- with open(module_path, "rU") as mod:
- code = mod.read()
- return get_code_complexity(code, threshold, filename=module_path)
- def _read(filename):
- if (2, 5) < sys.version_info < (3, 0):
- with open(filename, 'rU') as f:
- return f.read()
- elif (3, 0) <= sys.version_info < (4, 0):
- """Read the source code."""
- try:
- with open(filename, 'rb') as f:
- (encoding, _) = tokenize.detect_encoding(f.readline)
- except (LookupError, SyntaxError, UnicodeError):
- # Fall back if file encoding is improperly declared
- with open(filename, encoding='latin-1') as f:
- return f.read()
- with open(filename, 'r', encoding=encoding) as f:
- return f.read()
- def main(argv=None):
- if argv is None:
- argv = sys.argv[1:]
- opar = optparse.OptionParser()
- opar.add_option("-d", "--dot", dest="dot",
- help="output a graphviz dot file", action="store_true")
- opar.add_option("-m", "--min", dest="threshold",
- help="minimum complexity for output", type="int",
- default=1)
- options, args = opar.parse_args(argv)
- code = _read(args[0])
- tree = compile(code, args[0], "exec", ast.PyCF_ONLY_AST)
- visitor = PathGraphingAstVisitor()
- visitor.preorder(tree, visitor)
- if options.dot:
- print('graph {')
- for graph in visitor.graphs.values():
- if (not options.threshold or
- graph.complexity() >= options.threshold):
- graph.to_dot()
- print('}')
- else:
- for graph in visitor.graphs.values():
- if graph.complexity() >= options.threshold:
- print(graph.name, graph.complexity())
- if __name__ == '__main__':
- main(sys.argv[1:])
|