forked from idank/explainshell
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmatcher.py
More file actions
642 lines (528 loc) · 26.4 KB
/
matcher.py
File metadata and controls
642 lines (528 loc) · 26.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
import collections, logging, itertools
import bashlex.parser
import bashlex.ast
from explainshell import errors, util, helpconstants
class matchgroup(object):
'''a class to group matchresults together
we group all shell results in one group and create a new group for every
command'''
def __init__(self, name):
self.name = name
self.results = []
def __repr__(self):
return '<matchgroup %r with %d results>' % (self.name, len(self.results))
class matchresult(collections.namedtuple('matchresult', 'start end text match')):
@property
def unknown(self):
return self.text is None
matchwordexpansion = collections.namedtuple('matchwordexpansion',
'start end kind')
logger = logging.getLogger(__name__)
class matcher(bashlex.ast.nodevisitor):
'''parse a command line and return a list of matchresults describing
each token.
'''
def __init__(self, s, store):
# "sanitize" the string since bashlex doesn't seem to like non English
# text. This replaces anything that can't encode to latin1 with '?',
# and decodes back to an str.
self.s = s.encode('latin1', 'replace').decode('latin1')
self.store = store
self._prevoption = self._currentoption = None
self.groups = [matchgroup('shell')]
# a list of matchwordexpansions where expansions happened during word
# expansion
self.expansions = []
# a stack to manage nested command groups: whenever a new simple
# command is started, we push a tuple with:
# - the node that started this group. this is used to find it when
# a command ends (see visitnodeend)
# - its matchgroup. new matchresults will be added to it.
# - a word used to end the top-most command. this is used when a flag
# starts a new command, e.g. find -exec.
self.groupstack = [(None, self.groups[-1], None)]
# keep a stack of the currently visited compound command (if/for..)
# to provide context when matching reserved words, since for example
# the keyword 'done' can appear in a for, while..
self.compoundstack = []
# a set of functions defined in the current input, we will try to match
# commands against them so if one refers to defined function, it won't
# show up as unknown or be taken from the db
self.functions = set()
def _generatecommandgroupname(self):
existing = len([g for g in self.groups if g.name.startswith('command')])
return 'command%d' % existing
@property
def matches(self):
'''return the list of results from the most recently created group'''
return self.groupstack[-1][1].results
@property
def allmatches(self):
return list(itertools.chain.from_iterable(g.results for g in self.groups))
@property
def manpage(self):
group = self.groupstack[-1][1]
# we do not have a manpage if the top of the stack is the shell group.
# this can happen if the first argument is a command substitution
# and we're not treating it as a "man page not found"
if group.name != 'shell':
return group.manpage
def find_option(self, opt):
self._currentoption = self.manpage.find_option(opt)
logger.debug('looking up option %r, got %r', opt, self._currentoption)
return self._currentoption
def findmanpages(self, prog):
logger.info('looking up %r in store', prog)
manpages = self.store.findmanpage(prog)
logger.info('found %r in store, got: %r, using %r', prog, manpages, manpages[0])
return manpages
def unknown(self, token, start, end):
logger.debug('nothing to do with token %r', token)
return matchresult(start, end, None, None)
def visitreservedword(self, node, word):
# first try the compound reserved words
helptext = None
if self.compoundstack:
currentcompound = self.compoundstack[-1]
helptext = helpconstants.COMPOUNDRESERVEDWORDS.get(currentcompound, {}).get(word)
# try these if we don't have anything specific
if not helptext:
helptext = helpconstants.RESERVEDWORDS[word]
self.groups[0].results.append(matchresult(node.pos[0], node.pos[1], helptext, None))
def visitoperator(self, node, op):
helptext = None
if self.compoundstack:
currentcompound = self.compoundstack[-1]
helptext = helpconstants.COMPOUNDRESERVEDWORDS.get(currentcompound, {}).get(op)
if not helptext:
helptext = helpconstants.OPERATORS[op]
self.groups[0].results.append(matchresult(node.pos[0], node.pos[1], helptext, None))
def visitpipe(self, node, pipe):
self.groups[0].results.append(
matchresult(node.pos[0], node.pos[1], helpconstants.PIPELINES, None))
def visitredirect(self, node, input, type, output, heredoc):
helptext = [helpconstants.REDIRECTION]
if type == '>&' and isinstance(output, int):
type = type[:-1]
if type in helpconstants.REDIRECTION_KIND:
helptext.append(helpconstants.REDIRECTION_KIND[type])
self.groups[0].results.append(
matchresult(node.pos[0], node.pos[1], '\n\n'.join(helptext), None))
# the output might contain a wordnode, visiting it will confuse the
# matcher who'll think it's an argument, instead visit the expansions
# directly, if we have any
if isinstance(output, bashlex.ast.node):
for part in output.parts:
self.visit(part)
return False
def visitcommand(self, node, parts):
assert parts
# look for the first WordNode, which might not be at parts[0]
idxwordnode = bashlex.ast.findfirstkind(parts, 'word')
if idxwordnode == -1:
logger.info('no words found in command (probably contains only redirects)')
return
wordnode = parts[idxwordnode]
# check if this refers to a previously defined function
if wordnode.word in self.functions:
logger.info('word %r is a function, not trying to match it or its '
'arguments', wordnode)
# first, add a matchresult for the function call
mr = matchresult(wordnode.pos[0], wordnode.pos[1],
helpconstants._functioncall % wordnode.word, None)
self.matches.append(mr)
# this is a bit nasty: if we were to visit the command like we
# normally do it would try to match it against a manpage. but
# we don't have one here, we just want to take all the words and
# consider them part of the function call
for part in parts:
# maybe it's a redirect...
if part.kind != 'word':
self.visit(part)
else:
# this is an argument to the function
if part is not wordnode:
mr = matchresult(part.pos[0], part.pos[1],
helpconstants._functionarg % wordnode.word,
None)
self.matches.append(mr)
# visit any expansions in there
for ppart in part.parts:
self.visit(ppart)
# we're done with this commandnode, don't visit its children
return False
self.startcommand(node, parts, None)
def visitif(self, *args):
self.compoundstack.append('if')
def visitfor(self, node, parts):
self.compoundstack.append('for')
for part in parts:
# don't visit words since they're not part of the current command,
# instead consider them part of the for construct
if part.kind == 'word':
mr = matchresult(part.pos[0], part.pos[1], helpconstants._for, None)
self.groups[0].results.append(mr)
# but we do want to visit expanions
for ppart in part.parts:
self.visit(ppart)
else:
self.visit(part)
return False
def visitwhile(self, *args):
self.compoundstack.append('while')
def visituntil(self, *args):
self.compoundstack.append('until')
def visitnodeend(self, node):
if node.kind == 'command':
# it's possible for visitcommand/end to be called without a command
# group being pushed if it contains only redirect nodes
if len(self.groupstack) > 1:
logger.info('visitnodeend %r, groups %d', node, len(self.groupstack))
while self.groupstack[-1][0] is not node:
logger.info('popping groups that are a result of nested commands')
self.endcommand()
self.endcommand()
elif node.kind in ('if', 'for', 'while', 'until'):
kind = self.compoundstack.pop()
assert kind == node.kind
def startcommand(self, commandnode, parts, endword, addgroup=True):
logger.info('startcommand commandnode=%r parts=%r, endword=%r, addgroup=%s',
commandnode, parts, endword, addgroup)
idxwordnode = bashlex.ast.findfirstkind(parts, 'word')
assert idxwordnode != -1
wordnode = parts[idxwordnode]
if wordnode.parts:
logger.info('node %r has parts (it was expanded), no point in looking'
' up a manpage for it', wordnode)
if addgroup:
mg = matchgroup(self._generatecommandgroupname())
mg.manpage = None
mg.suggestions = None
self.groups.append(mg)
self.groupstack.append((commandnode, mg, endword))
return False
startpos, endpos = wordnode.pos
try:
mps = self.findmanpages(wordnode.word)
# we consume this node here, pop it from parts so we
# don't visit it again as an argument
parts.pop(idxwordnode)
except errors.ProgramDoesNotExist as e:
if addgroup:
# add a group for this command, we'll mark it as unknown
# when visitword is called
logger.info('no manpage found for %r, adding a group for it',
wordnode.word)
mg = matchgroup(self._generatecommandgroupname())
mg.error = e
mg.manpage = None
mg.suggestions = None
self.groups.append(mg)
self.groupstack.append((commandnode, mg, endword))
return False
manpage = mps[0]
idxnextwordnode = bashlex.ast.findfirstkind(parts, 'word')
# check the next word for a possible multicommand if:
# - the matched manpage says so
# - we have another word node
# - the word node has no expansions in it
if manpage.multicommand and idxnextwordnode != -1 and not parts[idxnextwordnode].parts:
nextwordnode = parts[idxnextwordnode]
try:
multi = '%s %s' % (wordnode.word, nextwordnode.word)
logger.info('%r is a multicommand, trying to get another token and look up %r', manpage, multi)
mps = self.findmanpages(multi)
manpage = mps[0]
# we consume this node here, pop it from parts so we
# don't visit it again as an argument
parts.pop(idxnextwordnode)
endpos = nextwordnode.pos[1]
except errors.ProgramDoesNotExist:
logger.info('no manpage %r for multicommand %r', multi, manpage)
# create a new matchgroup for the current command
mg = matchgroup(self._generatecommandgroupname())
mg.manpage = manpage
mg.suggestions = mps[1:]
self.groups.append(mg)
self.groupstack.append((commandnode, mg, endword))
self.matches.append(matchresult(startpos, endpos,
manpage.synopsis or helpconstants.NOSYNOPSIS, None))
return True
def endcommand(self):
'''end the most recently created command group by popping it from the
group stack. groups are created by visitcommand or a nested command'''
assert len(self.groupstack) >= 2, 'groupstack must contain shell and command groups'
g = self.groupstack.pop()
logger.info('ending group %s', g)
def visitcommandsubstitution(self, node, command):
kind = self.s[node.pos[0]]
substart = 2 if kind == '$' else 1
# start the expansion after the $( or `
self.expansions.append(matchwordexpansion(node.pos[0] + substart,
node.pos[1] - 1,
'substitution'))
# do not try to match the child nodes
return False
def visitprocesssubstitution(self, node, command):
# don't include opening <( and closing )
self.expansions.append(matchwordexpansion(node.pos[0] + 2,
node.pos[1] - 1,
'substitution'))
# do not try to match the child nodes
return False
def visitassignment(self, node, word):
helptext = helpconstants.ASSIGNMENT
self.groups[0].results.append(matchresult(node.pos[0], node.pos[1], helptext, None))
def visitword(self, node, word):
def attemptfuzzy(chars):
m = []
if chars[0] == '-':
tokens = [chars[0:2]] + list(chars[2:])
considerarg = True
else:
tokens = list(chars)
considerarg = False
pos = node.pos[0]
prevoption = None
for i, t in enumerate(tokens):
op = t if t[0] == '-' else '-' + t
option = self.find_option(op)
if option:
if considerarg and not m and option.expectsarg:
logger.info('option %r expected an arg, taking the rest too', option)
# reset the current option if we already took an argument,
# this prevents the next word node to also consider itself
# as an argument
self._currentoption = None
return [matchresult(pos, pos+len(chars), option.text, None)]
mr = matchresult(pos, pos+len(t), option.text, None)
m.append(mr)
# if the previous option expected an argument and we couldn't
# match the current token, take the rest as its argument, this
# covers a series of short options where the last one has an argument
# with no space between it, such as 'xargs -r0n1'
elif considerarg and prevoption and prevoption.expectsarg:
pmr = m[-1]
mr = matchresult(pmr.start, pmr.end+(len(tokens)-i), pmr.text, None)
m[-1] = mr
# reset the current option if we already took an argument,
# this prevents the next word node to also consider itself
# as an argument
self._currentoption = None
break
else:
m.append(self.unknown(t, pos, pos+len(t)))
pos += len(t)
prevoption = option
return m
def _visitword(node, word):
if not self.manpage:
logger.info('inside an unknown command, giving up on %r', word)
self.matches.append(self.unknown(word, node.pos[0], node.pos[1]))
return
logger.info('trying to match token: %r', word)
self._prevoption = self._currentoption
if word.startswith('--'):
word = word.split('=', 1)[0]
option = self.find_option(word)
if option:
logger.info('found an exact match for %r: %r', word, option)
mr = matchresult(node.pos[0], node.pos[1], option.text, None)
self.matches.append(mr)
# check if we splitted the word just above, if we did then reset
# the current option so the next word doesn't consider itself
# an argument
if word != node.word:
self._currentoption = None
else:
word = node.word
# check if we're inside a nested command and this word marks the end
if isinstance(self.groupstack[-1][-1], list) and word in self.groupstack[-1][-1]:
logger.info('token %r ends current nested command', word)
self.endcommand()
mr = matchresult(node.pos[0], node.pos[1], self.matches[-1].text, None)
self.matches.append(mr)
elif word != '-' and word.startswith('-') and not word.startswith('--'):
logger.debug('looks like a short option')
if len(word) > 2:
logger.info("trying to split it up")
self.matches.extend(attemptfuzzy(word))
else:
self.matches.append(self.unknown(word, node.pos[0], node.pos[1]))
elif self._prevoption and self._prevoption.expectsarg:
logger.info("previous option possibly expected an arg, and we can't"
" find an option to match the current token, assuming it's an arg")
ea = self._prevoption.expectsarg
possibleargs = ea if isinstance(ea, list) else []
take = True
if possibleargs and word not in possibleargs:
take = False
logger.info('token %r not in list of possible args %r for %r',
word, possibleargs, self._prevoption)
if take:
if self._prevoption.nestedcommand:
logger.info('option %r can nest commands', self._prevoption)
if self.startcommand(None, [node], self._prevoption.nestedcommand, addgroup=False):
self._currentoption = None
return
pmr = self.matches[-1]
mr = matchresult(pmr.start, node.pos[1], pmr.text, None)
self.matches[-1] = mr
else:
self.matches.append(self.unknown(word, node.pos[0], node.pos[1]))
else:
if self.manpage.partialmatch:
logger.info('attemping to do a partial match')
m = attemptfuzzy(word)
if not any(mm.unknown for mm in m):
logger.info('found a match for everything, taking it')
self.matches.extend(m)
return
if self.manpage.arguments:
if self.manpage.nestedcommand:
logger.info('manpage %r can nest commands', self.manpage)
if self.startcommand(None, [node], self.manpage.nestedcommand, addgroup=False):
self._currentoption = None
return
d = self.manpage.arguments
k = list(d.keys())[0]
logger.info('got arguments, using %r', k)
text = d[k]
mr = matchresult(node.pos[0], node.pos[1], text, None)
self.matches.append(mr)
return
# if all of that failed, we can't explain it so mark it unknown
self.matches.append(self.unknown(word, node.pos[0], node.pos[1]))
_visitword(node, word)
def visitfunction(self, node, name, body, parts):
self.functions.add(name.word)
def _iscompoundopenclosecurly(compound):
first, last = compound.list[0], compound.list[-1]
if (first.kind == 'reservedword' and last.kind == 'reservedword' and
first.word == '{' and last.word == '}'):
return True
# if the compound command we have there is { }, let's include the
# {} as part of the function declaration. normally it would be
# treated as a group command, but that seems less informative in this
# context
if _iscompoundopenclosecurly(body):
# create a matchresult until after the first {
mr = matchresult(node.pos[0], body.list[0].pos[1],
helpconstants._function, None)
self.groups[0].results.append(mr)
# create a matchresult for the closing }
mr = matchresult(body.list[-1].pos[0], body.list[-1].pos[1],
helpconstants._function, None)
self.groups[0].results.append(mr)
# visit anything in between the { }
for part in body.list[1:-1]:
self.visit(part)
else:
beforebody = bashlex.ast.findfirstkind(parts, 'compound') - 1
assert beforebody > 0
beforebody = parts[beforebody]
# create a matchresult ending at the node before body
mr = matchresult(node.pos[0], beforebody.pos[1],
helpconstants._function, None)
self.groups[0].results.append(mr)
self.visit(body)
return False
def visittilde(self, node, value):
self.expansions.append(matchwordexpansion(node.pos[0], node.pos[1],
'tilde'))
def visitparameter(self, node, value):
try:
int(value)
kind = 'digits'
except ValueError:
kind = helpconstants.parameters.get(value, 'param')
self.expansions.append(matchwordexpansion(node.pos[0], node.pos[1],
'parameter-%s' % kind))
def match(self):
logger.info('matching string %r', self.s)
# limit recursive parsing to a depth of 1
self.ast = bashlex.parser.parsesingle(self.s, expansionlimit=1,
strictmode=False)
if self.ast:
self.visit(self.ast)
assert len(self.groupstack) == 1, 'groupstack should contain only shell group after matching'
# if we only have one command in there and no shell results/expansions,
# reraise the original exception
if (len(self.groups) == 2 and not self.groups[0].results and
self.groups[1].manpage is None and not self.expansions):
raise self.groups[1].error
else:
logger.warn('no AST generated for %r', self.s)
def debugmatch():
s = '\n'.join(['%d) %r = %r' % (i, self.s[m.start:m.end], m.text) for i, m in enumerate(self.allmatches)])
return s
self._markunparsedunknown()
# fix each matchgroup seperately
for group in self.groups:
if group.results:
if getattr(group, 'manpage', None):
# ensure that the program part isn't unknown (i.e. it has
# something as its synopsis)
assert not group.results[0].unknown
group.results = self._mergeadjacent(group.results)
# add matchresult.match to existing matches
for i, m in enumerate(group.results):
assert m.end <= len(self.s), '%d %d' % (m.end, len(self.s))
portion = self.s[m.start:m.end]
group.results[i] = matchresult(m.start, m.end, m.text, portion)
logger.debug('%r matches:\n%s', self.s, debugmatch())
# not strictly needed, but doesn't hurt
self.expansions.sort()
return self.groups
def _markunparsedunknown(self):
'''the parser may leave a remainder at the end of the string if it doesn't
match any of the rules, mark them as unknowns'''
parsed = [False]*len(self.s)
# go over all existing matches to see if we've covered the
# current position
for start, end, _, _ in self.allmatches:
for i in range(start, end):
parsed[i] = True
for i in range(len(parsed)):
c = self.s[i]
# whitespace is always 'unparsed'
if c.isspace():
parsed[i] = True
# the parser ignores comments but we can use a trick to see if this
# starts a comment and is beyond the ending index of the parsed
# portion of the inpnut
if (not self.ast or i > self.ast.pos[1]) and c == '#':
comment = matchresult(i, len(parsed), helpconstants.COMMENT, None)
self.groups[0].results.append(comment)
break
if not parsed[i]:
# add unparsed results to the 'shell' group
self.groups[0].results.append(self.unknown(c, i, i+1))
# there are no overlaps, so sorting by the start is enough
self.groups[0].results.sort(key=lambda mr: mr.start)
def _resultindex(self):
'''return a mapping of matchresults to their index among all
matches, sorted by the start position of the matchresult'''
d = {}
i = 0
for result in sorted(self.allmatches, key=lambda mr: mr.start):
d[result] = i
i += 1
return d
def _mergeadjacent(self, matches):
merged = []
resultindex = self._resultindex()
sametext = itertools.groupby(matches, lambda m: m.text)
for text, ll in sametext:
for l in util.groupcontinuous(ll, key=lambda m: resultindex[m]):
if len(l) == 1:
merged.append(l[0])
else:
start = l[0].start
end = l[-1].end
endindex = resultindex[l[-1]]
for mr in l:
del resultindex[mr]
merged.append(matchresult(start, end, text, None))
resultindex[merged[-1]] = endindex
return merged