1
2 """
3 The meat of Lamson, doing all the work that actually takes an email and makes
4 sure that your code gets it.
5
6 The three most important parts for a programmer are the Router variable, the
7 StateStorage base class, and the @route, @route_like, and @stateless decorators.
8
9 The lamson.routing.Router variable (it's not a class, just named like one) is
10 how the whole system gets to the Router. It is an instance of RoutingBase and
11 there's usually only one.
12
13 The lamson.routing.StateStorage is what you need to implement if you want Lamson
14 to store the state in a different way. By default the lamson.routing.Router
15 object just uses a default MemoryStorage to do its job. If you want to use a
16 custom storage, then in your config/boot.py (or config/testing.py) you would set
17 lamson.routing.Router.STATE_STORE to what you want to use.
18
19 Finally, when you write a state handler, it has functions that act as state
20 functions for dealing with each state. To tell the Router what function should
21 handle what email you use a @route decorator. To tell the Route that one
22 function routes the same as another use @route_like. In the case where a state
23 function should run on every matching email, just use the @stateless decorator
24 after a @route or @route_like.
25
26 If at any time you need to debug your routing setup just use the lamson routes
27 command.
28
29 Routing Control
30 ===============
31
32 To control routing there are a set of decorators that you apply to your
33 functions.
34
35 * @route -- The main routing function that determines what addresses you are
36 interested in.
37 * @route_like -- Says that this function routes like another one.
38 * @stateless -- Indicates this function always runs on each route encountered, and
39 no state is maintained.
40 * @nolocking -- Use this if you want this handler to run parallel without any
41 locking around Lamson internals. SUPER DANGEROUS, add @stateless as well.
42 * @state_key_generator -- Used on a function that knows how to make your state
43 keys for the module, for example if module_name + message.route_to is needed to maintain
44 state.
45
46 It's best to put @route or @route_like as the first decorator, then the others
47 after that.
48
49 The @state_key_generator is different since it's not intended to go on a handler
50 but instead on a simple function, so it shouldn't be combined with the others.
51 """
52
53 from __future__ import with_statement
54 from functools import wraps
55 import re
56 import logging
57 import sys
58 import email.utils
59 import shelve
60 import threading
61
62 ROUTE_FIRST_STATE = 'START'
63 LOG = logging.getLogger("routing")
64 DEFAULT_STATE_KEY = lambda mod, msg: mod
68 """
69 The base storage class you need to implement for a custom storage
70 system.
71 """
72 - def get(self, key, sender):
73 """
74 You must implement this so that it returns a single string
75 of either the state for this combination of arguments, OR
76 the ROUTE_FIRST_STATE setting.
77 """
78 raise NotImplementedError("You have to implement a StateStorage.get.")
79
80 - def set(self, key, sender, state):
81 """
82 Set should take the given parameters and consistently set the state for
83 that combination such that when StateStorage.get is called it gives back
84 the same setting.
85 """
86 raise NotImplementedError("You have to implement a StateStorage.set.")
87
89 """
90 This should clear ALL states, it is only used in unit testing, so you
91 can have it raise an exception if you want to make this safer.
92 """
93 raise NotImplementedError("You have to implement a StateStorage.clear for unit testing to work.")
94
97 """
98 The default simplified storage for the Router to hold the states. This
99 should only be used in testing, as you'll lose all your contacts and their
100 states if your server shutsdown. It is also horribly NOT thread safe.
101 """
104
105 - def get(self, key, sender):
111
112 - def set(self, key, sender, state):
113 key = self.key(key, sender)
114 if state == ROUTE_FIRST_STATE:
115 try:
116 del self.states[key]
117 except KeyError:
118 pass
119 else:
120 self.states[key] = state
121
122 - def key(self, key, sender):
123 return repr([key, sender])
124
127
130 """
131 Uses Python's shelve to store the state of the Routers to disk rather than
132 in memory like with MemoryStorage. This will get you going on a small
133 install if you need to persist your states (most likely), but if you
134 have a database, you'll need to write your own StateStorage that
135 uses your ORM or database to store. Consider this an example.
136
137 NOTE: Because of shelve limitations you can only use ASCII encoded keys.
138 """
140 """Database path depends on the backing library use by Python's shelve."""
141 self.database_path = database_path
142 self.lock = threading.RLock()
143
144 - def get(self, key, sender):
145 """
146 This will lock the internal thread lock, and then retrieve from the
147 shelf whatever key you request. If the key is not found then it
148 will set (atomically) to ROUTE_FIRST_STATE.
149 """
150 with self.lock:
151 self.states = shelve.open(self.database_path)
152 value = super(ShelveStorage, self).get(key.encode('ascii'), sender)
153 self.states.close()
154 return value
155
156 - def set(self, key, sender, state):
157 """
158 Acquires the self.lock and then sets the requested state in the shelf.
159 """
160 with self.lock:
161 self.states = shelve.open(self.database_path)
162 super(ShelveStorage, self).set(key.encode('ascii'), sender, state)
163 self.states.close()
164
166 """
167 Primarily used in the debugging/unit testing process to make sure the
168 states are clear. In production this could be a bad thing.
169 """
170 with self.lock:
171 self.states = shelve.open(self.database_path)
172 super(ShelveStorage, self).clear()
173 self.states.close()
174
178 """
179 The self is a globally accessible class that is actually more like a
180 glorified module. It is used mostly internally by the lamson.routing
181 decorators (route, route_like, stateless) to control the routing
182 mechanism.
183
184 It keeps track of the registered routes, their attached functions, the
185 order that these routes should be evaluated, any default routing captures,
186 and uses the MemoryStorage by default to keep track of the states.
187
188 You can change the storage to another implementation by simple setting:
189
190 self.STATE_STORE = OtherStorage()
191
192 In a config/settings.py file.
193
194 RoutingBase does locking on every write to its internal data (which usually
195 only happens during booting and reloading while debugging), and when each
196 handler's state function is called. ALL threads will go through this lock,
197 but only as each state is run, so you won't have a situation where the chain
198 of state functions will block all the others. This means that while your
199 handler runs nothing will be running, but you have not guarantees about
200 the order of each state function.
201
202 However, this can kill the performance of some kinds of state functions,
203 so if you find the need to not have locking, then use the @nolocking
204 decorator and the Router will NOT lock when that function is called. That
205 means while your @nolocking state function is running at least one other
206 thread (more if the next ones happen to be @nolocking) could also be
207 running.
208
209 It's your job to keep things straight if you do that.
210
211 NOTE: See @state_key_generator for a way to change what the key is to
212 STATE_STORE for different state control options.
213 """
214
216 self.REGISTERED = {}
217 self.ORDER = []
218 self.DEFAULT_CAPTURES = {}
219 self.STATE_STORE = MemoryStorage()
220 self.HANDLERS = {}
221 self.RELOAD = False
222 self.LOG_EXCEPTIONS = True
223 self.UNDELIVERABLE_QUEUE = None
224 self.lock = threading.RLock()
225 self.call_lock = threading.RLock()
226
228 """
229 Registers this function func into the routes mapping based on the
230 format given. Format should be a regex string ready to be handed to
231 re.compile.
232 """
233 with self.lock:
234 if format in self.REGISTERED:
235 self.REGISTERED[format][1].append(func)
236 else:
237 self.ORDER.append(format)
238 self.REGISTERED[format] = (re.compile(format, re.IGNORECASE), [func])
239
240 - def match(self, address):
241 """
242 This is a generator that goes through all the routes and
243 yields each match it finds. It expects you to give it a
244 blah@blah.com address, NOT "Joe Blow" <blah@blah.com>.
245 """
246 for format in self.ORDER:
247 regex, functions = self.REGISTERED[format]
248 match = regex.match(address)
249 if match:
250 yield functions, match.groupdict()
251
253 """
254 Updates the defaults for routing captures with the given settings.
255
256 You use this in your handlers or your config/settings.py to set
257 common regular expressions you'll have in your @route decorators.
258 This saves you typing, but also makes it easy to reconfigure later.
259
260 For example, many times you'll have a single host="..." regex
261 for all your application's routes. Put this in your settings.py
262 file using route_defaults={'host': '...'} and you're done.
263 """
264 with self.lock:
265 self.DEFAULT_CAPTURES.update(captures)
266
268 """Returns the state that this module is in for the given message (using its from)."""
269 key = self.state_key(module_name, message)
270 return self.STATE_STORE.get(key, message.route_from)
271
272
274 """
275 Determines if this function is in the state for the to/from in the
276 message. Doesn't apply to @stateless state handlers.
277 """
278 state = self.get_state(func.__module__, message)
279 return state and state == func.__name__
280
282 """
283 Determines if the this function is in the 'ERROR' state,
284 which is a special state that self puts handlers in that throw
285 an exception.
286 """
287 state = self.get_state(func.__module__, message)
288 return state and state == 'ERROR'
289
291 """
292 Given a module_name we need to get a state key for, and a
293 message that has information to make the key, this function
294 calls any registered @state_key_generator and returns that
295 as the key. If none is given then it just returns module_name
296 as the key.
297 """
298 key_func = self.HANDLERS.get(module_name, DEFAULT_STATE_KEY)
299 return key_func(module_name, message)
300
301 - def set_state(self, module_name, message, state):
302 """
303 Sets the state of the given module (a string) according to the message to the requested
304 state (a string). This is also how you can force another FSM to a required state.
305 """
306 key = self.state_key(module_name, message)
307 self.STATE_STORE.set(key, message.route_from, state)
308
310 in_state_found = False
311
312 for functions, matchkw in self.match(route_to):
313 for func in functions:
314 if lamson_setting(func, 'stateless'):
315 yield func, matchkw
316 elif not in_state_found and self.in_state(func, message):
317 in_state_found = True
318 yield func, matchkw
319
321 if self.UNDELIVERABLE_QUEUE:
322 LOG.debug("Message to %r from %r undeliverable, putting in undeliverable queue (# of recipients: %d).",
323 message.route_to, message.route_from, len(message.route_to))
324 self.UNDELIVERABLE_QUEUE.push(message)
325 else:
326 LOG.debug("Message to %r from %r didn't match any handlers. (# recipients: %d)",
327 message.route_to, message.route_from, len(message.route_to))
328
330 """
331 The meat of the whole Lamson operation, this method takes all the
332 arguments given, and then goes through the routing listing to figure out
333 which state handlers should get the gear. The routing operates on a
334 simple set of rules:
335
336 1) Match on all functions that match the given To in their
337 registered format pattern.
338 2) Call all @stateless state handlers functions.
339 3) Call the first method that's in the right state for the From/To.
340
341 It will log which handlers are being run, and you can use the 'lamson route'
342 command to inspect and debug routing problems.
343
344 If you have an ERROR state function, then when your state blows up, it will
345 transition to ERROR state and call your function right away. It will then
346 stay in the ERROR state unless you return a different one.
347 """
348 if self.RELOAD: self.reload()
349
350 called_count = 0
351
352 for routing_on in message.route_to:
353 for func, matchkw in self._collect_matches(message, routing_on):
354 LOG.debug("Matched %r against %s.", routing_on, func.__name__)
355
356 if lamson_setting(func, 'nolocking'):
357 self.call_safely(func, message, matchkw)
358 else:
359 with self.call_lock:
360 self.call_safely(func, message, matchkw)
361
362 called_count += 1
363
364 if called_count == 0:
365 self._enqueue_undeliverable(message)
366
367
369 """
370 Used by self to call a function and log exceptions rather than
371 explode and crash.
372 """
373 from lamson.server import SMTPError
374
375 try:
376 func(message, **kwargs)
377 LOG.debug("Message to %s was handled by %s.%s",
378 message.route_to, func.__module__, func.__name__)
379 except SMTPError:
380 raise
381 except:
382 self.set_state(func.__module__, message, 'ERROR')
383
384 if self.UNDELIVERABLE_QUEUE:
385 self.UNDELIVERABLE_QUEUE.push(message)
386
387 if self.LOG_EXCEPTIONS:
388 LOG.exception("!!! ERROR handling %s.%s", func.__module__, func.__name__)
389 else:
390 raise
391
392
394 """Clears out the states for unit testing."""
395 with self.lock:
396 self.STATE_STORE.clear()
397
399 """Clears out the routes for unit testing and reloading."""
400 with self.lock:
401 self.REGISTERED.clear()
402 del self.ORDER[:]
403
404
405 - def load(self, handlers):
406 """
407 Loads the listed handlers making them available for processing.
408 This is safe to call multiple times and to duplicate handlers
409 listed.
410 """
411 with self.lock:
412 for module in handlers:
413 try:
414 __import__(module, globals(), locals())
415
416 if module not in self.HANDLERS:
417
418
419 self.HANDLERS[module] = DEFAULT_STATE_KEY
420 except:
421 if self.LOG_EXCEPTIONS:
422 LOG.exception("ERROR IMPORTING %r MODULE:" % module)
423 else:
424 raise
425
427 """
428 Performs a reload of all the handlers and clears out all routes,
429 but doesn't touch the internal state.
430 """
431 with self.lock:
432 self.clear_routes()
433 for module in sys.modules.keys():
434 if module in self.HANDLERS:
435 try:
436 reload(sys.modules[module])
437 except:
438 if self.LOG_EXCEPTIONS:
439 LOG.exception("ERROR RELOADING %r MODULE:" % module)
440 else:
441 raise
442
443 Router = RoutingBase()
446 """
447 The @route decorator is attached to state handlers to configure them in the
448 Router so they handle messages for them. The way this works is, rather than
449 just routing working on only messages being sent to a state handler, it also uses
450 the state of the sender. It's like having routing in a web application use
451 both the URL and an internal state setting to determine which method to run.
452
453 However, if you'd rather than this state handler process all messages
454 matching the @route then tag it @stateless. This will run the handler
455 no matter what and not change the user's state.
456 """
457
458 - def __init__(self, format, **captures):
459 """
460 Sets up the pattern used for the Router configuration. The format
461 parameter is a simple pattern of words, captures, and anything you
462 want to ignore. The captures parameter is a mapping of the words in
463 the format to regex that get put into the format. When the pattern is
464 matched, the captures are handed to your state handler as keyword
465 arguments.
466
467 For example, if you have:
468
469 @route("(list_name)-(action)@(host)",
470 list_name='[a-z]+',
471 action='[a-z]+', host='test\.com')
472 def STATE(message, list_name=None, action=None, host=None):
473 ....
474
475 Then this will be translated so that list_name is replaced with [a-z]+,
476 action with [a-z]+, and host with 'test.com' to produce a regex with the
477 right format and named captures to that your state handler is called
478 with the proper keyword parameters.
479
480 You should also use the Router.defaults() to set default things like the
481 host so that you are not putting it into your code.
482 """
483 self.captures = Router.DEFAULT_CAPTURES.copy()
484 self.captures.update(captures)
485 self.format = self.parse_format(format, self.captures)
486
488 """Returns either a decorator that does a stateless routing or
489 a normal routing."""
490 self.setup_accounting(func)
491
492 if lamson_setting(func, 'stateless'):
493 @wraps(func)
494 def routing_wrapper(message, *args, **kw):
495 next_state = func(message, *args, **kw)
496 else:
497 @wraps(func)
498 def routing_wrapper(message, *args, **kw):
499 next_state = func(message, *args, **kw)
500
501 if next_state:
502 Router.set_state(next_state.__module__, message, next_state.__name__)
503
504 Router.register_route(self.format, routing_wrapper)
505 return routing_wrapper
506
507 - def __get__(self, obj, of_type=None):
508 """
509 This is NOT SUPPORTED. It is here just so that if you try to apply
510 this decorator to a class's method it will barf on you.
511 """
512 raise NotImplementedError("Not supported on methods yet, only module functions.")
513
519
521 """Sets up an accounting map attached to the func for routing decorators."""
522 attach_lamson_settings(func)
523 func._lamson_settings['format'] = self.format
524 func._lamson_settings['captures'] = self.captures
525
528 """Simple way to get the lamson setting off the function, or None."""
529 return func._lamson_settings.get(key)
530
533 return "_lamson_settings" in func.__dict__
534
536 """Used to make sure that the func has been setup by a routing decorator."""
537 assert has_lamson_settings(func), "Function %s has not be setup with a @route first." % func.__name__
538
541 """Use this to setup the _lamson_settings if they aren't already there."""
542 if '_lamson_settings' not in func.__dict__:
543 func._lamson_settings = {}
544
547 """
548 Many times you want your state handler to just accept mail like another
549 handler. Use this, passing in the other function. It even works across
550 modules.
551 """
553 assert_lamson_settings(func)
554 self.format = func._lamson_settings['format']
555 self.captures = func._lamson_settings['captures']
556
559 """
560 This simple decorator is attached to a handler to indicate to the
561 Router.deliver() method that it does NOT maintain state or care about it.
562 This is how you create a handler that processes all messages matching the
563 given format+captures in a @route.
564
565 Another way to think about a @stateless handler is that it is a passthrough
566 handler that does its processing and then passes the results on to others.
567
568 Stateless handlers are NOT guaranteed to run before the handler with state.
569 """
570 if has_lamson_settings(func):
571 assert not lamson_setting(func, 'format'), "You must use @stateless AFTER @route or @route_like."
572
573 attach_lamson_settings(func)
574 func._lamson_settings['stateless'] = True
575
576 return func
577
579 """
580 Normally lamson.routing.Router has a lock around each call to all handlers
581 to prevent them from stepping on eachother. It's assumed that 95% of the
582 time this is what you want, so it's the default. You probably want
583 everything to go in order and not step on other things going off from other
584 threads in the system.
585
586 However, sometimes you know better what you are doing and this is where
587 @nolocking comes in. Put this decorator on your state functions that you
588 don't care about threading issues or that you have found a need to
589 manually tune, and it will run it without any locks.
590 """
591 attach_lamson_settings(func)
592 func._lamson_settings['nolocking'] = True
593 return func
594
596 """
597 Used to indicate that a function in your handlers should be used
598 to determine what they key is for state storage. It should be a
599 function that takes the module_name and message being worked on
600 and returns a string.
601 """
602 Router.HANDLERS[func.__module__] = func
603 return func
604