Package lamson :: Module routing
[hide private]
[frames] | no frames]

Source Code for Module lamson.routing

  1   
  2  """ 
  3  The meat of Lamson, doing all the work that actually takes an email and makes 
  4  sure that your code gets it. 
  5   
  6  The three most important parts for a programmer are the Router variable, the 
  7  StateStorage base class, and the @route, @route_like, and @stateless decorators. 
  8   
  9  The lamson.routing.Router variable (it's not a class, just named like one) is 
 10  how the whole system gets to the Router.  It is an instance of RoutingBase and 
 11  there's usually only one. 
 12   
 13  The lamson.routing.StateStorage is what you need to implement if you want Lamson 
 14  to store the state in a different way.  By default the lamson.routing.Router 
 15  object just uses a default MemoryStorage to do its job.  If you want to use a 
 16  custom storage, then in your config/boot.py (or config/testing.py) you would set 
 17  lamson.routing.Router.STATE_STORE to what you want to use. 
 18   
 19  Finally, when you write a state handler, it has functions that act as state 
 20  functions for dealing with each state.  To tell the Router what function should 
 21  handle what email you use a @route decorator.  To tell the Route that one 
 22  function routes the same as another use @route_like.  In the case where a state 
 23  function should run on every matching email, just use the @stateless decorator 
 24  after a @route or @route_like. 
 25   
 26  If at any time you need to debug your routing setup just use the lamson routes 
 27  command. 
 28   
 29  Routing Control 
 30  =============== 
 31   
 32  To control routing there are a set of decorators that you apply to your 
 33  functions. 
 34   
 35  * @route -- The main routing function that determines what addresses you are 
 36  interested in. 
 37  * @route_like -- Says that this function routes like another one. 
 38  * @stateless -- Indicates this function always runs on each route encountered, and 
 39  no state is maintained. 
 40  * @nolocking -- Use this if you want this handler to run parallel without any 
 41  locking around Lamson internals.  SUPER DANGEROUS, add @stateless as well. 
 42  * @state_key_generator -- Used on a function that knows how to make your state 
 43  keys for the module, for example if module_name + message.route_to is needed to maintain 
 44  state. 
 45   
 46  It's best to put @route or @route_like as the first decorator, then the others  
 47  after that. 
 48   
 49  The @state_key_generator is different since it's not intended to go on a handler 
 50  but instead on a simple function, so it shouldn't be combined with the others. 
 51  """ 
 52   
 53  from __future__ import with_statement 
 54  from functools import wraps 
 55  import re 
 56  import logging 
 57  import sys 
 58  import email.utils 
 59  import shelve 
 60  import threading 
 61   
 62  ROUTE_FIRST_STATE = 'START' 
 63  LOG = logging.getLogger("routing") 
 64  DEFAULT_STATE_KEY = lambda mod, msg: mod 
65 66 67 -class StateStorage(object):
68 """ 69 The base storage class you need to implement for a custom storage 70 system. 71 """
72 - def get(self, key, sender):
73 """ 74 You must implement this so that it returns a single string 75 of either the state for this combination of arguments, OR 76 the ROUTE_FIRST_STATE setting. 77 """ 78 raise NotImplementedError("You have to implement a StateStorage.get.")
79
80 - def set(self, key, sender, state):
81 """ 82 Set should take the given parameters and consistently set the state for 83 that combination such that when StateStorage.get is called it gives back 84 the same setting. 85 """ 86 raise NotImplementedError("You have to implement a StateStorage.set.")
87
88 - def clear(self):
89 """ 90 This should clear ALL states, it is only used in unit testing, so you 91 can have it raise an exception if you want to make this safer. 92 """ 93 raise NotImplementedError("You have to implement a StateStorage.clear for unit testing to work.")
94
95 96 -class MemoryStorage(StateStorage):
97 """ 98 The default simplified storage for the Router to hold the states. This 99 should only be used in testing, as you'll lose all your contacts and their 100 states if your server shutsdown. It is also horribly NOT thread safe. 101 """
102 - def __init__(self):
103 self.states = {}
104
105 - def get(self, key, sender):
106 key = self.key(key, sender) 107 try: 108 return self.states[key] 109 except KeyError: 110 return ROUTE_FIRST_STATE
111
112 - def set(self, key, sender, state):
113 key = self.key(key, sender) 114 if state == ROUTE_FIRST_STATE: 115 try: 116 del self.states[key] 117 except KeyError: 118 pass 119 else: 120 self.states[key] = state
121
122 - def key(self, key, sender):
123 return repr([key, sender])
124
125 - def clear(self):
126 self.states.clear()
127
128 129 -class ShelveStorage(MemoryStorage):
130 """ 131 Uses Python's shelve to store the state of the Routers to disk rather than 132 in memory like with MemoryStorage. This will get you going on a small 133 install if you need to persist your states (most likely), but if you 134 have a database, you'll need to write your own StateStorage that 135 uses your ORM or database to store. Consider this an example. 136 137 NOTE: Because of shelve limitations you can only use ASCII encoded keys. 138 """
139 - def __init__(self, database_path):
140 """Database path depends on the backing library use by Python's shelve.""" 141 self.database_path = database_path 142 self.lock = threading.RLock()
143
144 - def get(self, key, sender):
145 """ 146 This will lock the internal thread lock, and then retrieve from the 147 shelf whatever key you request. If the key is not found then it 148 will set (atomically) to ROUTE_FIRST_STATE. 149 """ 150 with self.lock: 151 self.states = shelve.open(self.database_path) 152 value = super(ShelveStorage, self).get(key.encode('ascii'), sender) 153 self.states.close() 154 return value
155
156 - def set(self, key, sender, state):
157 """ 158 Acquires the self.lock and then sets the requested state in the shelf. 159 """ 160 with self.lock: 161 self.states = shelve.open(self.database_path) 162 super(ShelveStorage, self).set(key.encode('ascii'), sender, state) 163 self.states.close()
164
165 - def clear(self):
166 """ 167 Primarily used in the debugging/unit testing process to make sure the 168 states are clear. In production this could be a bad thing. 169 """ 170 with self.lock: 171 self.states = shelve.open(self.database_path) 172 super(ShelveStorage, self).clear() 173 self.states.close()
174
175 176 177 -class RoutingBase(object):
178 """ 179 The self is a globally accessible class that is actually more like a 180 glorified module. It is used mostly internally by the lamson.routing 181 decorators (route, route_like, stateless) to control the routing 182 mechanism. 183 184 It keeps track of the registered routes, their attached functions, the 185 order that these routes should be evaluated, any default routing captures, 186 and uses the MemoryStorage by default to keep track of the states. 187 188 You can change the storage to another implementation by simple setting: 189 190 self.STATE_STORE = OtherStorage() 191 192 In a config/settings.py file. 193 194 RoutingBase does locking on every write to its internal data (which usually 195 only happens during booting and reloading while debugging), and when each 196 handler's state function is called. ALL threads will go through this lock, 197 but only as each state is run, so you won't have a situation where the chain 198 of state functions will block all the others. This means that while your 199 handler runs nothing will be running, but you have not guarantees about 200 the order of each state function. 201 202 However, this can kill the performance of some kinds of state functions, 203 so if you find the need to not have locking, then use the @nolocking 204 decorator and the Router will NOT lock when that function is called. That 205 means while your @nolocking state function is running at least one other 206 thread (more if the next ones happen to be @nolocking) could also be 207 running. 208 209 It's your job to keep things straight if you do that. 210 211 NOTE: See @state_key_generator for a way to change what the key is to 212 STATE_STORE for different state control options. 213 """ 214
215 - def __init__(self):
216 self.REGISTERED = {} 217 self.ORDER = [] 218 self.DEFAULT_CAPTURES = {} 219 self.STATE_STORE = MemoryStorage() 220 self.HANDLERS = {} 221 self.RELOAD = False 222 self.LOG_EXCEPTIONS = True 223 self.UNDELIVERABLE_QUEUE = None 224 self.lock = threading.RLock() 225 self.call_lock = threading.RLock()
226
227 - def register_route(self, format, func):
228 """ 229 Registers this function func into the routes mapping based on the 230 format given. Format should be a regex string ready to be handed to 231 re.compile. 232 """ 233 with self.lock: 234 if format in self.REGISTERED: 235 self.REGISTERED[format][1].append(func) 236 else: 237 self.ORDER.append(format) 238 self.REGISTERED[format] = (re.compile(format, re.IGNORECASE), [func])
239
240 - def match(self, address):
241 """ 242 This is a generator that goes through all the routes and 243 yields each match it finds. It expects you to give it a 244 blah@blah.com address, NOT "Joe Blow" <blah@blah.com>. 245 """ 246 for format in self.ORDER: 247 regex, functions = self.REGISTERED[format] 248 match = regex.match(address) 249 if match: 250 yield functions, match.groupdict()
251
252 - def defaults(self, **captures):
253 """ 254 Updates the defaults for routing captures with the given settings. 255 256 You use this in your handlers or your config/settings.py to set 257 common regular expressions you'll have in your @route decorators. 258 This saves you typing, but also makes it easy to reconfigure later. 259 260 For example, many times you'll have a single host="..." regex 261 for all your application's routes. Put this in your settings.py 262 file using route_defaults={'host': '...'} and you're done. 263 """ 264 with self.lock: 265 self.DEFAULT_CAPTURES.update(captures)
266
267 - def get_state(self, module_name, message):
268 """Returns the state that this module is in for the given message (using its from).""" 269 key = self.state_key(module_name, message) 270 return self.STATE_STORE.get(key, message.route_from)
271 272
273 - def in_state(self, func, message):
274 """ 275 Determines if this function is in the state for the to/from in the 276 message. Doesn't apply to @stateless state handlers. 277 """ 278 state = self.get_state(func.__module__, message) 279 return state and state == func.__name__
280
281 - def in_error(self, func, message):
282 """ 283 Determines if the this function is in the 'ERROR' state, 284 which is a special state that self puts handlers in that throw 285 an exception. 286 """ 287 state = self.get_state(func.__module__, message) 288 return state and state == 'ERROR'
289
290 - def state_key(self, module_name, message):
291 """ 292 Given a module_name we need to get a state key for, and a 293 message that has information to make the key, this function 294 calls any registered @state_key_generator and returns that 295 as the key. If none is given then it just returns module_name 296 as the key. 297 """ 298 key_func = self.HANDLERS.get(module_name, DEFAULT_STATE_KEY) 299 return key_func(module_name, message)
300
301 - def set_state(self, module_name, message, state):
302 """ 303 Sets the state of the given module (a string) according to the message to the requested 304 state (a string). This is also how you can force another FSM to a required state. 305 """ 306 key = self.state_key(module_name, message) 307 self.STATE_STORE.set(key, message.route_from, state)
308
309 - def _collect_matches(self, message, route_to):
310 in_state_found = False 311 312 for functions, matchkw in self.match(route_to): 313 for func in functions: 314 if lamson_setting(func, 'stateless'): 315 yield func, matchkw 316 elif not in_state_found and self.in_state(func, message): 317 in_state_found = True 318 yield func, matchkw
319
320 - def _enqueue_undeliverable(self, message):
321 if self.UNDELIVERABLE_QUEUE: 322 LOG.debug("Message to %r from %r undeliverable, putting in undeliverable queue (# of recipients: %d).", 323 message.route_to, message.route_from, len(message.route_to)) 324 self.UNDELIVERABLE_QUEUE.push(message) 325 else: 326 LOG.debug("Message to %r from %r didn't match any handlers. (# recipients: %d)", 327 message.route_to, message.route_from, len(message.route_to))
328
329 - def deliver(self, message):
330 """ 331 The meat of the whole Lamson operation, this method takes all the 332 arguments given, and then goes through the routing listing to figure out 333 which state handlers should get the gear. The routing operates on a 334 simple set of rules: 335 336 1) Match on all functions that match the given To in their 337 registered format pattern. 338 2) Call all @stateless state handlers functions. 339 3) Call the first method that's in the right state for the From/To. 340 341 It will log which handlers are being run, and you can use the 'lamson route' 342 command to inspect and debug routing problems. 343 344 If you have an ERROR state function, then when your state blows up, it will 345 transition to ERROR state and call your function right away. It will then 346 stay in the ERROR state unless you return a different one. 347 """ 348 if self.RELOAD: self.reload() 349 350 called_count = 0 351 352 for routing_on in message.route_to: 353 for func, matchkw in self._collect_matches(message, routing_on): 354 LOG.debug("Matched %r against %s.", routing_on, func.__name__) 355 356 if lamson_setting(func, 'nolocking'): 357 self.call_safely(func, message, matchkw) 358 else: 359 with self.call_lock: 360 self.call_safely(func, message, matchkw) 361 362 called_count += 1 363 364 if called_count == 0: 365 self._enqueue_undeliverable(message)
366 367
368 - def call_safely(self, func, message, kwargs):
369 """ 370 Used by self to call a function and log exceptions rather than 371 explode and crash. 372 """ 373 from lamson.server import SMTPError 374 375 try: 376 func(message, **kwargs) 377 LOG.debug("Message to %s was handled by %s.%s", 378 message.route_to, func.__module__, func.__name__) 379 except SMTPError: 380 raise 381 except: 382 self.set_state(func.__module__, message, 'ERROR') 383 384 if self.UNDELIVERABLE_QUEUE: 385 self.UNDELIVERABLE_QUEUE.push(message) 386 387 if self.LOG_EXCEPTIONS: 388 LOG.exception("!!! ERROR handling %s.%s", func.__module__, func.__name__) 389 else: 390 raise
391 392
393 - def clear_states(self):
394 """Clears out the states for unit testing.""" 395 with self.lock: 396 self.STATE_STORE.clear()
397
398 - def clear_routes(self):
399 """Clears out the routes for unit testing and reloading.""" 400 with self.lock: 401 self.REGISTERED.clear() 402 del self.ORDER[:]
403 404
405 - def load(self, handlers):
406 """ 407 Loads the listed handlers making them available for processing. 408 This is safe to call multiple times and to duplicate handlers 409 listed. 410 """ 411 with self.lock: 412 for module in handlers: 413 try: 414 __import__(module, globals(), locals()) 415 416 if module not in self.HANDLERS: 417 # they didn't specify a key generator, so use the 418 # default one for now 419 self.HANDLERS[module] = DEFAULT_STATE_KEY 420 except: 421 if self.LOG_EXCEPTIONS: 422 LOG.exception("ERROR IMPORTING %r MODULE:" % module) 423 else: 424 raise
425
426 - def reload(self):
427 """ 428 Performs a reload of all the handlers and clears out all routes, 429 but doesn't touch the internal state. 430 """ 431 with self.lock: 432 self.clear_routes() 433 for module in sys.modules.keys(): 434 if module in self.HANDLERS: 435 try: 436 reload(sys.modules[module]) 437 except: 438 if self.LOG_EXCEPTIONS: 439 LOG.exception("ERROR RELOADING %r MODULE:" % module) 440 else: 441 raise
442 443 Router = RoutingBase()
444 445 -class route(object):
446 """ 447 The @route decorator is attached to state handlers to configure them in the 448 Router so they handle messages for them. The way this works is, rather than 449 just routing working on only messages being sent to a state handler, it also uses 450 the state of the sender. It's like having routing in a web application use 451 both the URL and an internal state setting to determine which method to run. 452 453 However, if you'd rather than this state handler process all messages 454 matching the @route then tag it @stateless. This will run the handler 455 no matter what and not change the user's state. 456 """ 457
458 - def __init__(self, format, **captures):
459 """ 460 Sets up the pattern used for the Router configuration. The format 461 parameter is a simple pattern of words, captures, and anything you 462 want to ignore. The captures parameter is a mapping of the words in 463 the format to regex that get put into the format. When the pattern is 464 matched, the captures are handed to your state handler as keyword 465 arguments. 466 467 For example, if you have: 468 469 @route("(list_name)-(action)@(host)", 470 list_name='[a-z]+', 471 action='[a-z]+', host='test\.com') 472 def STATE(message, list_name=None, action=None, host=None): 473 .... 474 475 Then this will be translated so that list_name is replaced with [a-z]+, 476 action with [a-z]+, and host with 'test.com' to produce a regex with the 477 right format and named captures to that your state handler is called 478 with the proper keyword parameters. 479 480 You should also use the Router.defaults() to set default things like the 481 host so that you are not putting it into your code. 482 """ 483 self.captures = Router.DEFAULT_CAPTURES.copy() 484 self.captures.update(captures) 485 self.format = self.parse_format(format, self.captures)
486
487 - def __call__(self, func):
488 """Returns either a decorator that does a stateless routing or 489 a normal routing.""" 490 self.setup_accounting(func) 491 492 if lamson_setting(func, 'stateless'): 493 @wraps(func) 494 def routing_wrapper(message, *args, **kw): 495 next_state = func(message, *args, **kw)
496 else: 497 @wraps(func) 498 def routing_wrapper(message, *args, **kw): 499 next_state = func(message, *args, **kw) 500 501 if next_state: 502 Router.set_state(next_state.__module__, message, next_state.__name__)
503 504 Router.register_route(self.format, routing_wrapper) 505 return routing_wrapper 506
507 - def __get__(self, obj, of_type=None):
508 """ 509 This is NOT SUPPORTED. It is here just so that if you try to apply 510 this decorator to a class's method it will barf on you. 511 """ 512 raise NotImplementedError("Not supported on methods yet, only module functions.")
513
514 - def parse_format(self, format, captures):
515 """Does the grunt work of convertion format+captures into the regex.""" 516 for key in captures: 517 format = format.replace("(" + key + ")", "(?P<%s>%s)" % (key, captures[key])) 518 return "^" + format + "$"
519
520 - def setup_accounting(self, func):
521 """Sets up an accounting map attached to the func for routing decorators.""" 522 attach_lamson_settings(func) 523 func._lamson_settings['format'] = self.format 524 func._lamson_settings['captures'] = self.captures
525
526 527 -def lamson_setting(func, key):
528 """Simple way to get the lamson setting off the function, or None.""" 529 return func._lamson_settings.get(key)
530
531 532 -def has_lamson_settings(func):
533 return "_lamson_settings" in func.__dict__
534
535 -def assert_lamson_settings(func):
536 """Used to make sure that the func has been setup by a routing decorator.""" 537 assert has_lamson_settings(func), "Function %s has not be setup with a @route first." % func.__name__
538
539 540 -def attach_lamson_settings(func):
541 """Use this to setup the _lamson_settings if they aren't already there.""" 542 if '_lamson_settings' not in func.__dict__: 543 func._lamson_settings = {}
544
545 546 -class route_like(route):
547 """ 548 Many times you want your state handler to just accept mail like another 549 handler. Use this, passing in the other function. It even works across 550 modules. 551 """
552 - def __init__(self, func):
553 assert_lamson_settings(func) 554 self.format = func._lamson_settings['format'] 555 self.captures = func._lamson_settings['captures']
556
557 558 -def stateless(func):
559 """ 560 This simple decorator is attached to a handler to indicate to the 561 Router.deliver() method that it does NOT maintain state or care about it. 562 This is how you create a handler that processes all messages matching the 563 given format+captures in a @route. 564 565 Another way to think about a @stateless handler is that it is a passthrough 566 handler that does its processing and then passes the results on to others. 567 568 Stateless handlers are NOT guaranteed to run before the handler with state. 569 """ 570 if has_lamson_settings(func): 571 assert not lamson_setting(func, 'format'), "You must use @stateless AFTER @route or @route_like." 572 573 attach_lamson_settings(func) 574 func._lamson_settings['stateless'] = True 575 576 return func
577
578 -def nolocking(func):
579 """ 580 Normally lamson.routing.Router has a lock around each call to all handlers 581 to prevent them from stepping on eachother. It's assumed that 95% of the 582 time this is what you want, so it's the default. You probably want 583 everything to go in order and not step on other things going off from other 584 threads in the system. 585 586 However, sometimes you know better what you are doing and this is where 587 @nolocking comes in. Put this decorator on your state functions that you 588 don't care about threading issues or that you have found a need to 589 manually tune, and it will run it without any locks. 590 """ 591 attach_lamson_settings(func) 592 func._lamson_settings['nolocking'] = True 593 return func
594
595 -def state_key_generator(func):
596 """ 597 Used to indicate that a function in your handlers should be used 598 to determine what they key is for state storage. It should be a 599 function that takes the module_name and message being worked on 600 and returns a string. 601 """ 602 Router.HANDLERS[func.__module__] = func 603 return func
604