mitmproxy.addonmanager
1import contextlib 2import inspect 3import logging 4import pprint 5import traceback 6import types 7from collections.abc import Callable, Sequence 8from dataclasses import dataclass 9from typing import Any, Optional 10 11import sys 12 13from mitmproxy import exceptions 14from mitmproxy import flow 15from mitmproxy import hooks 16 17logger = logging.getLogger(__name__) 18 19 20def _get_name(itm): 21 return getattr(itm, "name", itm.__class__.__name__.lower()) 22 23 24def cut_traceback(tb, func_name): 25 """ 26 Cut off a traceback at the function with the given name. 27 The func_name's frame is excluded. 28 29 Args: 30 tb: traceback object, as returned by sys.exc_info()[2] 31 func_name: function name 32 33 Returns: 34 Reduced traceback. 35 """ 36 tb_orig = tb 37 for _, _, fname, _ in traceback.extract_tb(tb): 38 tb = tb.tb_next 39 if fname == func_name: 40 break 41 return tb or tb_orig 42 43 44@contextlib.contextmanager 45def safecall(): 46 try: 47 yield 48 except (exceptions.AddonHalt, exceptions.OptionsError): 49 raise 50 except Exception: 51 etype, value, tb = sys.exc_info() 52 tb = cut_traceback(tb, "invoke_addon_sync") 53 tb = cut_traceback(tb, "invoke_addon") 54 logger.error( 55 "Addon error: %s" % "".join(traceback.format_exception(etype, value, tb)) 56 ) 57 58 59class Loader: 60 """ 61 A loader object is passed to the load() event when addons start up. 62 """ 63 64 def __init__(self, master): 65 self.master = master 66 67 def add_option( 68 self, 69 name: str, 70 typespec: type, 71 default: Any, 72 help: str, 73 choices: Optional[Sequence[str]] = None, 74 ) -> None: 75 """ 76 Add an option to mitmproxy. 77 78 Help should be a single paragraph with no linebreaks - it will be 79 reflowed by tools. Information on the data type should be omitted - 80 it will be generated and added by tools as needed. 81 """ 82 if name in self.master.options: 83 existing = self.master.options._options[name] 84 same_signature = ( 85 existing.name == name 86 and existing.typespec == typespec 87 and existing.default == default 88 and existing.help == help 89 and existing.choices == choices 90 ) 91 if same_signature: 92 return 93 else: 94 logger.warning("Over-riding existing option %s" % name) 95 self.master.options.add_option(name, typespec, default, help, choices) 96 97 def add_command(self, path: str, func: Callable) -> None: 98 """Add a command to mitmproxy. 99 100 Unless you are generating commands programatically, 101 this API should be avoided. Decorate your function with `@mitmproxy.command.command` instead. 102 """ 103 self.master.commands.add(path, func) 104 105 106def traverse(chain): 107 """ 108 Recursively traverse an addon chain. 109 """ 110 for a in chain: 111 yield a 112 if hasattr(a, "addons"): 113 yield from traverse(a.addons) 114 115 116@dataclass 117class LoadHook(hooks.Hook): 118 """ 119 Called when an addon is first loaded. This event receives a Loader 120 object, which contains methods for adding options and commands. This 121 method is where the addon configures itself. 122 """ 123 124 loader: Loader 125 126 127class AddonManager: 128 def __init__(self, master): 129 self.lookup = {} 130 self.chain = [] 131 self.master = master 132 master.options.changed.connect(self._configure_all) 133 134 def _configure_all(self, updated): 135 self.trigger(hooks.ConfigureHook(updated)) 136 137 def clear(self): 138 """ 139 Remove all addons. 140 """ 141 for a in self.chain: 142 self.invoke_addon_sync(a, hooks.DoneHook()) 143 self.lookup = {} 144 self.chain = [] 145 146 def get(self, name): 147 """ 148 Retrieve an addon by name. Addon names are equal to the .name 149 attribute on the instance, or the lower case class name if that 150 does not exist. 151 """ 152 return self.lookup.get(name, None) 153 154 def register(self, addon): 155 """ 156 Register an addon, call its load event, and then register all its 157 sub-addons. This should be used by addons that dynamically manage 158 addons. 159 160 If the calling addon is already running, it should follow with 161 running and configure events. Must be called within a current 162 context. 163 """ 164 api_changes = { 165 # mitmproxy 6 -> mitmproxy 7 166 "clientconnect": f"The clientconnect event has been removed, use client_connected instead", 167 "clientdisconnect": f"The clientdisconnect event has been removed, use client_disconnected instead", 168 "serverconnect": "The serverconnect event has been removed, use server_connect and server_connected instead", 169 "serverdisconnect": f"The serverdisconnect event has been removed, use server_disconnected instead", 170 # mitmproxy 8 -> mitmproxy 9 171 "add_log": "The add_log event has been deprecated, use Python's builtin logging module instead", 172 } 173 for a in traverse([addon]): 174 for old, msg in api_changes.items(): 175 if hasattr(a, old): 176 logger.warning( 177 f"{msg}. For more details, see https://docs.mitmproxy.org/dev/addons-api-changelog/." 178 ) 179 name = _get_name(a) 180 if name in self.lookup: 181 raise exceptions.AddonManagerError( 182 "An addon called '%s' already exists." % name 183 ) 184 l = Loader(self.master) 185 self.invoke_addon_sync(addon, LoadHook(l)) 186 for a in traverse([addon]): 187 name = _get_name(a) 188 self.lookup[name] = a 189 for a in traverse([addon]): 190 self.master.commands.collect_commands(a) 191 self.master.options.process_deferred() 192 return addon 193 194 def add(self, *addons): 195 """ 196 Add addons to the end of the chain, and run their load event. 197 If any addon has sub-addons, they are registered. 198 """ 199 for i in addons: 200 self.chain.append(self.register(i)) 201 202 def remove(self, addon): 203 """ 204 Remove an addon and all its sub-addons. 205 206 If the addon is not in the chain - that is, if it's managed by a 207 parent addon - it's the parent's responsibility to remove it from 208 its own addons attribute. 209 """ 210 for a in traverse([addon]): 211 n = _get_name(a) 212 if n not in self.lookup: 213 raise exceptions.AddonManagerError("No such addon: %s" % n) 214 self.chain = [i for i in self.chain if i is not a] 215 del self.lookup[_get_name(a)] 216 self.invoke_addon_sync(addon, hooks.DoneHook()) 217 218 def __len__(self): 219 return len(self.chain) 220 221 def __str__(self): 222 return pprint.pformat([str(i) for i in self.chain]) 223 224 def __contains__(self, item): 225 name = _get_name(item) 226 return name in self.lookup 227 228 async def handle_lifecycle(self, event: hooks.Hook): 229 """ 230 Handle a lifecycle event. 231 """ 232 message = event.args()[0] 233 234 await self.trigger_event(event) 235 236 if isinstance(message, flow.Flow): 237 await self.trigger_event(hooks.UpdateHook([message])) 238 239 def _iter_hooks(self, addon, event: hooks.Hook): 240 """ 241 Enumerate all hook callables belonging to the given addon 242 """ 243 assert isinstance(event, hooks.Hook) 244 for a in traverse([addon]): 245 func = getattr(a, event.name, None) 246 if func: 247 if callable(func): 248 yield a, func 249 elif isinstance(func, types.ModuleType): 250 # we gracefully exclude module imports with the same name as hooks. 251 # For example, a user may have "from mitmproxy import log" in an addon, 252 # which has the same name as the "log" hook. In this particular case, 253 # we end up in an error loop because we "log" this error. 254 pass 255 else: 256 raise exceptions.AddonManagerError( 257 f"Addon handler {event.name} ({a}) not callable" 258 ) 259 260 async def invoke_addon(self, addon, event: hooks.Hook): 261 """ 262 Asynchronously invoke an event on an addon and all its children. 263 """ 264 for addon, func in self._iter_hooks(addon, event): 265 res = func(*event.args()) 266 # Support both async and sync hook functions 267 if res is not None and inspect.isawaitable(res): 268 await res 269 270 def invoke_addon_sync(self, addon, event: hooks.Hook): 271 """ 272 Invoke an event on an addon and all its children. 273 """ 274 for addon, func in self._iter_hooks(addon, event): 275 if inspect.iscoroutinefunction(func): 276 raise exceptions.AddonManagerError( 277 f"Async handler {event.name} ({addon}) cannot be called from sync context" 278 ) 279 func(*event.args()) 280 281 async def trigger_event(self, event: hooks.Hook): 282 """ 283 Asynchronously trigger an event across all addons. 284 """ 285 for i in self.chain: 286 try: 287 with safecall(): 288 await self.invoke_addon(i, event) 289 except exceptions.AddonHalt: 290 return 291 292 def trigger(self, event: hooks.Hook): 293 """ 294 Trigger an event across all addons. 295 296 This API is discouraged and may be deprecated in the future. 297 Use `trigger_event()` instead, which provides the same functionality but supports async hooks. 298 """ 299 for i in self.chain: 300 try: 301 with safecall(): 302 self.invoke_addon_sync(i, event) 303 except exceptions.AddonHalt: 304 return
class
Loader:
60class Loader: 61 """ 62 A loader object is passed to the load() event when addons start up. 63 """ 64 65 def __init__(self, master): 66 self.master = master 67 68 def add_option( 69 self, 70 name: str, 71 typespec: type, 72 default: Any, 73 help: str, 74 choices: Optional[Sequence[str]] = None, 75 ) -> None: 76 """ 77 Add an option to mitmproxy. 78 79 Help should be a single paragraph with no linebreaks - it will be 80 reflowed by tools. Information on the data type should be omitted - 81 it will be generated and added by tools as needed. 82 """ 83 if name in self.master.options: 84 existing = self.master.options._options[name] 85 same_signature = ( 86 existing.name == name 87 and existing.typespec == typespec 88 and existing.default == default 89 and existing.help == help 90 and existing.choices == choices 91 ) 92 if same_signature: 93 return 94 else: 95 logger.warning("Over-riding existing option %s" % name) 96 self.master.options.add_option(name, typespec, default, help, choices) 97 98 def add_command(self, path: str, func: Callable) -> None: 99 """Add a command to mitmproxy. 100 101 Unless you are generating commands programatically, 102 this API should be avoided. Decorate your function with `@mitmproxy.command.command` instead. 103 """ 104 self.master.commands.add(path, func)
A loader object is passed to the load() event when addons start up.
def
add_option( self, name: str, typespec: type, default: Any, help: str, choices: Optional[collections.abc.Sequence[str]] = None) -> None:
68 def add_option( 69 self, 70 name: str, 71 typespec: type, 72 default: Any, 73 help: str, 74 choices: Optional[Sequence[str]] = None, 75 ) -> None: 76 """ 77 Add an option to mitmproxy. 78 79 Help should be a single paragraph with no linebreaks - it will be 80 reflowed by tools. Information on the data type should be omitted - 81 it will be generated and added by tools as needed. 82 """ 83 if name in self.master.options: 84 existing = self.master.options._options[name] 85 same_signature = ( 86 existing.name == name 87 and existing.typespec == typespec 88 and existing.default == default 89 and existing.help == help 90 and existing.choices == choices 91 ) 92 if same_signature: 93 return 94 else: 95 logger.warning("Over-riding existing option %s" % name) 96 self.master.options.add_option(name, typespec, default, help, choices)
Add an option to mitmproxy.
Help should be a single paragraph with no linebreaks - it will be reflowed by tools. Information on the data type should be omitted - it will be generated and added by tools as needed.
def
add_command(self, path: str, func: collections.abc.Callable) -> None:
98 def add_command(self, path: str, func: Callable) -> None: 99 """Add a command to mitmproxy. 100 101 Unless you are generating commands programatically, 102 this API should be avoided. Decorate your function with `@mitmproxy.command.command` instead. 103 """ 104 self.master.commands.add(path, func)
Add a command to mitmproxy.
Unless you are generating commands programatically,
this API should be avoided. Decorate your function with @mitmproxy.command.command
instead.