|
| 1 | +import time |
| 2 | +from abc import abstractmethod |
| 3 | +from typing import Optional, List |
| 4 | + |
| 5 | +from antlr4 import CommonTokenStream, InputStream, Lexer, Parser, ParserATNSimulator, ParserRuleContext, \ |
| 6 | + PredictionContextCache, Recognizer, Token, TokenStream |
| 7 | +from antlr4.error.ErrorListener import ErrorListener |
| 8 | +from pylasu.model import walk, Source, Position, Node, Point |
| 9 | +from pylasu.model.processing import assign_parents |
| 10 | +from pylasu.parsing.parse_tree import token_end_point |
| 11 | +from pylasu.parsing.results import ParsingResultWithFirstStage, FirstStageParsingResult |
| 12 | +from pylasu.validation import Issue, IssueType |
| 13 | + |
| 14 | + |
| 15 | +class PylasuANTLRParser: |
| 16 | + """ A complete description of a multi-stage ANTLR-based parser, from source code to AST. |
| 17 | +
|
| 18 | + You should extend this class to implement the parts that are specific to your language. |
| 19 | +
|
| 20 | + Note: instances of this class are thread-safe and they're meant to be reused. Do not create a new PylasuANTLRParser |
| 21 | + instance every time you need to parse some source code, or performance may suffer.""" |
| 22 | + def __init__(self): |
| 23 | + self.prediction_context_cache = PredictionContextCache() |
| 24 | + |
| 25 | + def parse(self, input_stream: InputStream, consider_range: bool = True, measure_lexing_time: bool = False, |
| 26 | + source: Optional[Source] = None): |
| 27 | + """Parses source code, returning a result that includes an AST and a collection of parse issues |
| 28 | + (errors, warnings). |
| 29 | + The parsing is done in accordance to the StarLasu methodology i.e. a first-stage parser builds a parse tree |
| 30 | + which is then mapped onto a higher-level tree called the AST. |
| 31 | + @param inputStream the source code. |
| 32 | + @param charset the character set in which the input is encoded. |
| 33 | + @param considerPosition if true (the default), parsed AST nodes record their position in the input text. |
| 34 | + @param measureLexingTime if true, the result will include a measurement of the time spent in lexing i.e. |
| 35 | + breaking the input stream into tokens.""" |
| 36 | + start = time.time_ns() |
| 37 | + first_stage = self.parse_first_stage(input_stream, measure_lexing_time) |
| 38 | + issues = first_stage.issues |
| 39 | + ast = self.parse_tree_to_ast(first_stage.root, consider_range, issues, source) |
| 40 | + self.assign_parents(ast) |
| 41 | + ast = self.post_process_ast(ast, issues) if ast else ast |
| 42 | + if ast and not consider_range: |
| 43 | + # Remove parseTreeNodes because they cause the range to be computed |
| 44 | + for node in walk(ast): |
| 45 | + node.origin = None |
| 46 | + now = time.time_ns() |
| 47 | + return ParsingResultWithFirstStage( |
| 48 | + issues, |
| 49 | + ast, |
| 50 | + input_stream.getText(0, input_stream.index + 1), |
| 51 | + (now - start) // 1_000_000, |
| 52 | + first_stage, |
| 53 | + source, |
| 54 | + ) |
| 55 | + |
| 56 | + def parse_first_stage( |
| 57 | + self, input_stream: InputStream, measure_lexing_time: bool = False, source: Source = None |
| 58 | + ) -> FirstStageParsingResult: |
| 59 | + """Executes only the first stage of the parser, i.e., the production of a parse tree. Usually, you'll want to |
| 60 | + use the [parse] method, that returns an AST which is simpler to use and query.""" |
| 61 | + issues = [] |
| 62 | + lexing_time: Optional[int] = None |
| 63 | + total_time: int = time.time_ns() |
| 64 | + parser = self.create_parser(input_stream, issues) |
| 65 | + if measure_lexing_time: |
| 66 | + token_stream = parser.getInputStream() |
| 67 | + if isinstance(token_stream, CommonTokenStream): |
| 68 | + lexing_time = time.time_ns() |
| 69 | + token_stream.fill() |
| 70 | + token_stream.seek(0) |
| 71 | + lexing_time = (time.time_ns() - lexing_time) // 1_000_000 |
| 72 | + root = self.invoke_root_rule(parser) |
| 73 | + if root: |
| 74 | + self.verify_parse_tree(parser, issues, root) |
| 75 | + total_time = (time.time_ns() - total_time) // 1_000_000 |
| 76 | + return FirstStageParsingResult(issues, root, None, total_time, lexing_time, source) |
| 77 | + |
| 78 | + def create_parser(self, input_stream: InputStream, issues: List[Issue]) -> Parser: |
| 79 | + """Creates the first-stage parser.""" |
| 80 | + lexer = self.create_antlr_lexer(input_stream) |
| 81 | + self.attach_listeners(lexer, issues) |
| 82 | + token_stream = self.create_token_stream(lexer) |
| 83 | + parser = self.create_antlr_parser(token_stream) |
| 84 | + # Assign interpreter to avoid caching DFA states indefinitely across executions |
| 85 | + parser._interp = \ |
| 86 | + ParserATNSimulator(parser, parser.atn, parser._interp.decisionToDFA, self.prediction_context_cache) |
| 87 | + self.attach_listeners(parser, issues) |
| 88 | + return parser |
| 89 | + |
| 90 | + def invoke_root_rule(self, parser: Parser): |
| 91 | + """Invokes the parser's root rule, i.e., the method which is responsible for parsing the entire input. |
| 92 | + Usually this is the topmost rule, the one with index 0 (as also assumed by other libraries such as antlr4-c3), |
| 93 | + so this method invokes that rule. If your grammar/parser is structured differently, or if you're using this to |
| 94 | + parse only a portion of the input or a subset of the language, you have to override this method to invoke the |
| 95 | + correct entry point.""" |
| 96 | + return getattr(parser, parser.ruleNames[0])() |
| 97 | + |
| 98 | + def verify_parse_tree(self, parser: Parser, issues: List[Issue], root: ParserRuleContext): |
| 99 | + """Checks the parse tree for correctness. |
| 100 | + If you're concerned about performance, you may want to override this to do nothing.""" |
| 101 | + last_token: Token = parser.getTokenStream().get(parser.getTokenStream().index) |
| 102 | + if last_token.type != Token.EOF: |
| 103 | + issues.append( |
| 104 | + Issue( |
| 105 | + IssueType.SYNTACTIC, |
| 106 | + "The whole input was not consumed", |
| 107 | + position=Position(token_end_point(last_token), token_end_point(last_token)) |
| 108 | + ) |
| 109 | + ) |
| 110 | + # TODO Kolasu also traverses the parse tree searching for exceptions |
| 111 | + |
| 112 | + def assign_parents(self, ast): |
| 113 | + if ast: |
| 114 | + assign_parents(ast) |
| 115 | + |
| 116 | + def post_process_ast(self, ast, issues): |
| 117 | + return ast |
| 118 | + |
| 119 | + def create_token_stream(self, lexer: Lexer) -> TokenStream: |
| 120 | + return CommonTokenStream(lexer) |
| 121 | + |
| 122 | + @abstractmethod |
| 123 | + def create_antlr_lexer(self, input_stream: InputStream): |
| 124 | + """Creates the lexer.""" |
| 125 | + pass |
| 126 | + |
| 127 | + @abstractmethod |
| 128 | + def create_antlr_parser(self, token_stream: TokenStream): |
| 129 | + """Creates the first-stage parser.""" |
| 130 | + pass |
| 131 | + |
| 132 | + @abstractmethod |
| 133 | + def parse_tree_to_ast(self, root, consider_range: bool, issues: List[Issue], source: Source) -> Optional[Node]: |
| 134 | + pass |
| 135 | + |
| 136 | + def attach_listeners(self, recognizer: Recognizer, issues: List[Issue]): |
| 137 | + recognizer.removeErrorListeners() |
| 138 | + recognizer.addErrorListener(ParserErrorListener(issues)) |
| 139 | + |
| 140 | + |
| 141 | +class ParserErrorListener(ErrorListener): |
| 142 | + def __init__(self, issues: List[Issue]): |
| 143 | + self.issues = issues |
| 144 | + |
| 145 | + def syntaxError(self, recognizer, offending_symbol, line, column, msg, e): |
| 146 | + start_point = Point(line, column) |
| 147 | + end_point = start_point |
| 148 | + if isinstance(offending_symbol, Token): |
| 149 | + end_point = token_end_point(offending_symbol) |
| 150 | + self.issues.append(Issue(IssueType.SYNTACTIC, msg or "unspecified", position=Position(start_point, end_point))) |
0 commit comments