]> git.sommitrealweird.co.uk Git - eoc.git/blob - eocTests.py
acf99b326b00fe2b2c32df608606e852201fbb12
[eoc.git] / eocTests.py
1 import unittest
2 import shutil
3 import os
4 import re
5 import time
6 import string
7
8 import eoc
9
10 DOTDIR = "dot-dir-for-testing"
11 eoc.DOTDIR = DOTDIR
12 eoc.quiet = 1
13
14 def no_op(*args):
15     pass
16
17 class AddressCleaningTestCases(unittest.TestCase):
18
19     def setUp(self):
20         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
21                                      "bar@lists.example.com"])
22     
23     def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
24         self.ap.set_skip_prefix(skip_prefix)
25         self.ap.set_forced_domain(forced_domain)
26         address = self.ap.clean(address)
27         self.failUnlessEqual(address, wanted)
28     
29     def testSimpleAddress(self):
30         self.verify("foo@example.com", "foo@example.com")
31
32     def testUpperCaseAddress(self):
33         self.verify("FOO@EXAMPLE.COM", "foo@example.com")
34
35     def testPrefixRemoval(self):
36         self.verify("foo@example.com", "foo@example.com", 
37                     skip_prefix="user-")
38         self.verify("user-foo@example.com", "foo@example.com", 
39                     skip_prefix="user-")
40
41     def testForcedDomain(self):
42         self.verify("foo@example.com", "foo@example.com",
43                     forced_domain="example.com")
44         self.verify("foo@whatever.example.com", "foo@example.com", 
45                     forced_domain="example.com")
46
47     def testPrefixRemovalWithForcedDomain(self):
48         self.verify("foo@example.com", "foo@example.com", 
49                     skip_prefix="user-",
50                     forced_domain="example.com")
51         self.verify("foo@whatever.example.com", "foo@example.com", 
52                     skip_prefix="user-",
53                     forced_domain="example.com")
54         self.verify("user-foo@example.com", "foo@example.com", 
55                     skip_prefix="user-",
56                     forced_domain="example.com")
57         self.verify("user-foo@whatever.example.com", "foo@example.com", 
58                     skip_prefix="user-",
59                     forced_domain="example.com")
60
61 class AddressParserTestCases(unittest.TestCase):
62
63     def setUp(self):
64         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
65                                      "bar@lists.example.com"])
66     
67     def verify_parser(self, address, wanted_listname, wanted_parts):
68         listname, parts = self.ap.parse(address)
69         self.failUnlessEqual(listname, wanted_listname)
70         self.failUnlessEqual(parts, wanted_parts)
71         
72     def testParser(self):
73         self.verify_parser("foo@example.com", 
74                            "foo@EXAMPLE.com", 
75                            [])
76         self.verify_parser("foo-subscribe@example.com", 
77                            "foo@EXAMPLE.com", 
78                            ["subscribe"])
79         self.verify_parser("foo-subscribe-joe=example.com@example.com", 
80                            "foo@EXAMPLE.com", 
81                            ["subscribe", "joe=example.com"])
82         self.verify_parser("foo-bounce-123-ABCDEF@example.com", 
83                            "foo@EXAMPLE.com", 
84                            ["bounce", "123", "abcdef"])
85
86 class ParseRecipientAddressBase(unittest.TestCase):
87
88     def setUp(self):
89         self.lists = ["foo@example.com", 
90                       "bar@lists.example.com",
91                       "foo-announce@example.com"]
92         self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
93     
94     def environ(self, sender, recipient):
95         eoc.set_environ({
96             "SENDER": sender,
97             "RECIPIENT": recipient,
98         })
99
100 class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):
101
102     def testEmpty(self):
103         self.failUnlessRaises(eoc.UnknownList,
104                               self.mlm.parse_recipient_address, 
105                               "", None, None)
106
107     def verify(self, address, skip_prefix, forced_domain, wanted_dict):
108         dict = self.mlm.parse_recipient_address(address, skip_prefix, 
109                                                 forced_domain)
110         self.failUnlessEqual(dict, wanted_dict)
111
112     def testSimpleAddresses(self):
113         self.verify("foo@example.com", 
114                     None, 
115                     None, 
116                     { "name": "foo@example.com", "command": "post" })
117         self.verify("FOO@EXAMPLE.COM", 
118                     None, 
119                     None, 
120                     { "name": "foo@example.com", "command": "post" })
121         self.verify("prefix-foo@example.com", 
122                     "prefix-", 
123                     None, 
124                     { "name": "foo@example.com", "command": "post" })
125         self.verify("bar@example.com", 
126                     None, 
127                     "lists.example.com", 
128                     { "name": "bar@lists.example.com", "command": "post" })
129         self.verify("prefix-bar@example.com", 
130                     "prefix-",
131                     "lists.example.com", 
132                     { "name": "bar@lists.example.com", "command": "post" })
133
134     def testSubscription(self):
135         self.verify("foo-subscribe@example.com", 
136                     None,
137                     None,
138                     { "name": "foo@example.com", 
139                       "command": "subscribe",
140                       "sender": "",
141                     })
142         self.verify("foo-subscribe-joe-user=example.com@example.com", 
143                     None,
144                     None,
145                     { "name": "foo@example.com", 
146                       "command": "subscribe",
147                       "sender": "joe-user@example.com",
148                     })
149         self.verify("foo-unsubscribe@example.com", 
150                     None,
151                     None,
152                     { "name": "foo@example.com", 
153                       "command": "unsubscribe",
154                       "sender": "",
155                     })
156         self.verify("foo-unsubscribe-joe-user=example.com@example.com", 
157                     None,
158                     None,
159                     { "name": "foo@example.com", 
160                       "command": "unsubscribe",
161                       "sender": "joe-user@example.com",
162                     })
163
164     def testPost(self):
165         for name in self.lists:
166             self.verify(name, None, None, { "name": name, "command": "post" })
167
168     def testSimpleCommands(self):
169         for name in self.lists:
170             for command in ["help", "list", "owner"]:
171                 localpart, domain = name.split("@")
172                 address = "%s-%s@%s" % (localpart, command, domain)
173                 self.verify(address, None, None,
174                             { "name": name,
175                               "command": command
176                             })
177
178 class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):
179
180     def try_good_signature(self, command):
181         s = "foo-announce-%s-1" % command
182         hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
183         local_part = "%s-%s" % (s, hash)
184         dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
185                                                 None, None)
186         self.failUnlessEqual(dict,
187                              {
188                                 "name": "foo-announce@example.com",
189                                 "command": command,
190                                 "id": "1",
191                              })
192
193     def testProperlySignedCommands(self):
194         self.try_good_signature("subyes")
195         self.try_good_signature("subapprove")
196         self.try_good_signature("subreject")
197         self.try_good_signature("unsubyes")
198         self.try_good_signature("bounce")
199         self.try_good_signature("approve")
200         self.try_good_signature("reject")
201         self.try_good_signature("probe")
202
203 class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):
204
205     def try_bad_signature(self, command_part):
206         self.failUnlessRaises(eoc.BadSignature,
207                               self.mlm.parse_recipient_address, 
208                               "foo-announce-" + command_part + 
209                                     "-123-badhash@example.com",
210                               None, None)
211
212     def testBadlySignedCommands(self):
213         self.try_bad_signature("subyes")
214         self.try_bad_signature("subapprove")
215         self.try_bad_signature("subreject")
216         self.try_bad_signature("unsubyes")
217         self.try_bad_signature("bounce")
218         self.try_bad_signature("approve")
219         self.try_bad_signature("reject")
220         self.try_bad_signature("probe")
221
222 class DotDirTestCases(unittest.TestCase):
223
224     def setUp(self):
225         self.secret_name = os.path.join(DOTDIR, "secret")
226
227     def tearDown(self):
228         shutil.rmtree(DOTDIR)
229
230     def dotdir_is_ok(self):
231         self.failUnless(os.path.isdir(DOTDIR))
232         self.failUnless(os.path.isfile(self.secret_name))
233
234     def testNoDotDirExists(self):
235         self.failIf(os.path.exists(DOTDIR))
236         mlm = eoc.MailingListManager(DOTDIR)
237         self.dotdir_is_ok()
238
239     def testDotDirDoesExistButSecretDoesNot(self):
240         self.failIf(os.path.exists(DOTDIR))
241         os.makedirs(DOTDIR)
242         self.failUnless(os.path.isdir(DOTDIR))
243         self.failIf(os.path.exists(self.secret_name))
244         mlm = eoc.MailingListManager(DOTDIR)
245         self.dotdir_is_ok()
246
247 class ListBase(unittest.TestCase):
248
249     def setUp(self):
250         if os.path.exists(DOTDIR):
251             shutil.rmtree(DOTDIR)
252         self.mlm = eoc.MailingListManager(DOTDIR)
253
254     def tearDown(self):
255         self.mlm = None
256         shutil.rmtree(DOTDIR)
257
258 class ListCreationTestCases(ListBase):
259
260     def setUp(self):
261         ListBase.setUp(self)
262         self.names = None
263
264     def listdir(self, listname):
265         return os.path.join(DOTDIR, listname)
266
267     def listdir_has_file(self, listdir, filename):
268         self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
269         self.names.remove(filename)
270
271     def listdir_has_dir(self, listdir, dirname):
272         self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
273         self.names.remove(dirname)
274
275     def listdir_may_have_dir(self, listdir, dirname):
276         if dirname in self.names:
277             self.listdir_has_dir(listdir, dirname)
278
279     def listdir_is_ok(self, listname):
280         listdir = self.listdir(listname)
281         self.failUnless(os.path.isdir(listdir))
282         self.names = os.listdir(listdir)
283         
284         self.listdir_has_file(listdir, "config")
285         self.listdir_has_file(listdir, "subscribers")
286             
287         self.listdir_has_dir(listdir, "bounce-box")
288         self.listdir_has_dir(listdir, "subscription-box")
289             
290         self.listdir_may_have_dir(listdir, "moderation-box")
291         self.listdir_may_have_dir(listdir, "templates")
292             
293         # Make sure there are no extras.
294         self.failUnlessEqual(self.names, [])
295
296     def testCreateNew(self):
297         self.failIf(os.path.exists(self.listdir("foo@example.com")))
298         ml = self.mlm.create_list("foo@example.com")
299
300         self.failUnlessEqual(ml.__class__, eoc.MailingList)
301         self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))
302
303         self.listdir_is_ok("foo@example.com")
304
305     def testCreateExisting(self):
306         list = self.mlm.create_list("foo@example.com")
307         self.failUnlessRaises(eoc.ListExists,
308                               self.mlm.create_list, "foo@example.com")
309         self.listdir_is_ok("foo@example.com")
310
311 class ListOptionTestCases(ListBase):
312
313     def check(self, ml, wanted):
314         self.failUnlessEqual(ml.cp.sections(), ["list"])
315         cpdict = {}
316         for key, value in ml.cp.items("list"):
317             cpdict[key] = value
318         self.failUnlessEqual(cpdict, wanted)
319
320     def testDefaultOptionsOnCreateAndOpenExisting(self):
321         self.mlm.create_list("foo@example.com")
322         ml = self.mlm.open_list("foo@example.com")
323         self.check(ml,
324                    {
325                       "owners": "",
326                       "moderators": "",
327                       "subscription": "free",
328                       "posting": "free",
329                       "archived": "no",
330                       "mail-on-subscription-changes": "no",
331                       "mail-on-forced-unsubscribe": "no",
332                       "ignore-bounce": "no",
333                       "language": "",
334                       "pristine-headers": "",
335                    })
336
337     def testChangeOptions(self):
338         # Create a list, change some options, and save the result.
339         ml = self.mlm.create_list("foo@example.com")
340         self.failUnlessEqual(ml.cp.get("list", "owners"), "")
341         self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
342         ml.cp.set("list", "owners", "owner@example.com")
343         ml.cp.set("list", "posting", "moderated")
344         ml.save_config()
345         
346         # Re-open the list and check that the new instance has read the
347         # values from the disk correctly.
348         ml2 = self.mlm.open_list("foo@example.com")
349         self.check(ml2,
350                    {
351                       "owners": "owner@example.com",
352                       "moderators": "",
353                       "subscription": "free",
354                       "posting": "moderated",
355                       "archived": "no",
356                       "mail-on-subscription-changes": "no",
357                       "mail-on-forced-unsubscribe": "no",
358                       "ignore-bounce": "no",
359                       "language": "",
360                       "pristine-headers": "",
361                    })
362
363 class SubscriberDatabaseTestCases(ListBase):
364
365     def has_subscribers(self, ml, addrs):
366         subs = ml.subscribers.get_all()
367         subs.sort()
368         self.failUnlessEqual(subs, addrs)
369
370     def testAddAndRemoveSubscribers(self):
371         addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
372         addrs.sort()
373     
374         ml = self.mlm.create_list("foo@example.com")
375         self.failUnlessEqual(ml.subscribers.get_all(), [])
376
377         self.failUnless(ml.subscribers.lock())
378         ml.subscribers.add_many(addrs)
379         self.has_subscribers(ml, addrs)
380         ml.subscribers.save()
381         self.failIf(ml.subscribers.locked)
382         ml = None
383
384         ml2 = self.mlm.open_list("foo@example.com")
385         self.has_subscribers(ml2, addrs)
386         ml2.subscribers.lock()
387         ml2.subscribers.remove(addrs[0])
388         self.has_subscribers(ml2, addrs[1:])
389         
390         ml2.subscribers.save()
391         
392         ml3 = self.mlm.open_list("foo@example.com")
393         self.has_subscribers(ml3, addrs[1:])
394
395     def testSubscribeTwice(self):
396         ml = self.mlm.create_list("foo@example.com")
397         self.failUnlessEqual(ml.subscribers.get_all(), [])
398         ml.subscribers.lock()
399         ml.subscribers.add("user@example.com")
400         ml.subscribers.add("USER@example.com")
401         self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
402                              map(string.lower, ["USER@example.com"]))
403
404     def testSubscriberAttributesAndGroups(self):
405         addrs = ["joe@example.com", "mary@example.com"]
406         addrs.sort()
407         ml = self.mlm.create_list("foo@example.com")
408         self.failUnlessEqual(ml.subscribers.groups(), [])
409         ml.subscribers.lock()
410         id = ml.subscribers.add_many(addrs)
411         self.failUnlessEqual(ml.subscribers.groups(), ["0"])
412         self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
413         ml.subscribers.set(id, "status", "bounced")
414         self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
415         subs = ml.subscribers.in_group(id)
416         subs.sort()
417         self.failUnlessEqual(subs, addrs)
418
419 class ModerationBoxTestCases(ListBase):
420
421     def testModerationBox(self):
422         ml = self.mlm.create_list("foo@example.com")
423         listdir = os.path.join(DOTDIR, "foo@example.com")
424         boxdir = os.path.join(listdir, "moderation-box")
425
426         self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
427         self.failUnless(os.path.isdir(boxdir))
428
429         mailtext = "From: foo\nTo: bar\n\nhello\n"
430         id = ml.moderation_box.add("foo", mailtext)
431         self.failUnless(ml.moderation_box.has(id))
432         self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
433         self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
434         
435         filename = os.path.join(boxdir, id)
436         self.failUnless(os.path.isfile(filename))
437         self.failUnless(os.path.isfile(filename + ".address"))
438         
439         ml.moderation_box.remove(id)
440         self.failIf(ml.moderation_box.has(id))
441         self.failUnless(not os.path.exists(filename))
442
443 class IncomingBase(unittest.TestCase):
444
445     def setUp(self):
446         if os.path.isdir(DOTDIR):
447             shutil.rmtree(DOTDIR)
448         self.mlm = eoc.MailingListManager(DOTDIR)
449         self.ml = None
450         ml = self.mlm.create_list("foo@EXAMPLE.com")
451         ml.cp.set("list", "owners", "listmaster@example.com")
452         ml.save_config()
453         ml.subscribers.lock()
454         ml.subscribers.add("USER1@example.com")
455         ml.subscribers.add("user2@EXAMPLE.com")
456         ml.subscribers.save()
457         self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
458         self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
459         self.sent_mail = []
460
461     def tearDown(self):
462         shutil.rmtree(DOTDIR)
463
464     def write_file_in_listdir(self, ml, basename, contents):
465         f = open(os.path.join(ml.dirname, basename), "w")
466         f.write(contents)
467         f.close()
468
469     def configure_list(self, subscription, posting):
470         list = self.mlm.open_list("foo@example.com")
471         list.cp.set("list", "subscription", subscription)
472         list.cp.set("list", "posting", posting)
473         list.save_config()
474
475     def environ(self, sender, recipient):
476         eoc.set_environ({
477             "SENDER": sender,
478             "RECIPIENT": recipient,
479         })
480
481     def catch_sendmail(self, sender, recipients, text):
482         self.sent_mail.append({
483             "sender": sender,
484             "recipients": recipients,
485             "text": text,
486         })
487
488     def send(self, sender, recipient, text="", force_moderation=0, 
489              force_posting=0):
490         self.environ(sender, recipient)
491         dict = self.mlm.parse_recipient_address(recipient, None, None)
492         dict["force-moderation"] = force_moderation
493         dict["force-posting"] = force_posting
494         self.ml = self.mlm.open_list(dict["name"])
495         if "\n\n" not in text:
496             text = "\n\n" + text
497         text = "Received: foobar\n" + text
498         self.ml.read_stdin = lambda t=text: t
499         self.mlm.send_mail = self.catch_sendmail
500         self.sent_mail = []
501         self.ml.obey(dict)
502
503     def sender_matches(self, mail, sender):
504         pat = "(?P<address>" + sender + ")"
505         m = re.match(pat, mail["sender"], re.I)
506         if m:
507             return m.group("address")
508         else:
509             return None
510         
511     def replyto_matches(self, mail, replyto):
512         pat = "(.|\n)*(?P<address>" + replyto + ")"
513         m = re.match(pat, mail["text"], re.I)
514         if m:
515             return m.group("address")
516         else:
517             return None
518
519     def receiver_matches(self, mail, recipient):
520         return map(string.lower, mail["recipients"]) == [recipient.lower()]
521
522     def body_matches(self, mail, body):
523         if body:
524             pat = re.compile("(.|\n)*" + body + "(.|\n)*")
525             m = re.match(pat, mail["text"])
526             return m
527         else:
528             return 1
529
530     def headers_match(self, mail, header):
531         if header:
532             pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
533             m = re.match(pat, mail["text"])
534             return m
535         else:
536             return 1
537
538     def match(self, sender, replyto, receiver, body=None, header=None,
539               anti_header=None):
540         ret = None
541         for mail in self.sent_mail:
542             if replyto is None:
543                 m1 = self.sender_matches(mail, sender)
544                 m3 = self.receiver_matches(mail, receiver)
545                 m4 = self.body_matches(mail, body)
546                 m5 = self.headers_match(mail, header)
547                 m6 = self.headers_match(mail, anti_header)
548                 no_anti_header = anti_header == None or m6 == None
549                 if m1 != None and m3 and m4 and m5 and no_anti_header:
550                     ret = m1
551                     self.sent_mail.remove(mail)
552                     break
553             else:
554                 m1 = self.sender_matches(mail, sender)
555                 m2 = self.replyto_matches(mail, replyto)
556                 m3 = self.receiver_matches(mail, receiver)
557                 m4 = self.body_matches(mail, body)
558                 m5 = self.headers_match(mail, header)
559                 m6 = self.headers_match(mail, anti_header)
560                 no_anti_header = anti_header == None or m6 == None
561                 if m1 != None and m2 != None and m3 and m4 and m5 and \
562                    no_anti_header:
563                     ret = m2
564                     self.sent_mail.remove(mail)
565                     break
566         self.failUnless(ret != None)
567         return ret
568
569     def no_more_mail(self):
570         self.failUnlessEqual(self.sent_mail, [])
571
572
573 class SimpleCommandAddressTestCases(IncomingBase):
574
575     def testHelp(self):
576         self.send("outsider@example.com", "foo-help@example.com")
577         self.match("foo-ignore@example.com", None, "outsider@example.com", 
578                    "Subject: Help for")
579         self.no_more_mail()
580
581     def testOwner(self):
582         self.send("outsider@example.com", "foo-owner@example.com", "abcde")
583         self.match("outsider@example.com", None, "listmaster@example.com",
584                    "abcde")
585         self.no_more_mail()
586
587     def testIgnore(self):
588         self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
589         self.no_more_mail()
590
591 class OwnerCommandTestCases(IncomingBase):
592
593     def testList(self):
594         self.send("listmaster@example.com", "foo-list@example.com")
595         self.match("foo-ignore@example.com", None, "listmaster@example.com",
596                    "[uU][sS][eE][rR][12]@" +
597                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
598                    "[uU][sS][eE][rR][12]@" +
599                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
600         self.no_more_mail()
601
602     def testListDenied(self):
603         self.send("outsider@example.com", "foo-list@example.com")
604         self.match("foo-ignore@example.com", None, "outsider@example.com", 
605                    "Subject: Subscriber list denied")
606         self.no_more_mail()
607
608     def testSetlist(self):
609         self.send("listmaster@example.com", "foo-setlist@example.com",
610                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
611         a = self.match("foo-ignore@example.com", 
612                        "foo-setlistyes-[^@]*@example.com", 
613                        "listmaster@example.com", 
614                        "Subject: Please moderate subscriber list")
615         self.no_more_mail()
616         
617         self.send("listmaster@example.com", a)
618         self.match("foo-ignore@example.com", None, "listmaster@example.com",
619                    "Subject: Subscriber list has been changed")
620         self.match("foo-ignore@example.com", None, "new1@example.com",
621                    "Subject: Welcome to")
622         self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
623                    "Subject: Goodbye from")
624         self.no_more_mail()
625
626     def testSetlistSilently(self):
627         self.send("listmaster@example.com", "foo-setlistsilently@example.com",
628                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
629         a = self.match("foo-ignore@example.com", 
630                        "foo-setlistsilentyes-[^@]*@example.com", 
631                        "listmaster@example.com", 
632                        "Subject: Please moderate subscriber list")
633         self.no_more_mail()
634         
635         self.send("listmaster@example.com", a)
636         self.match("foo-ignore@example.com", None, "listmaster@example.com",
637                    "Subject: Subscriber list has been changed")
638         self.no_more_mail()
639
640     def testSetlistDenied(self):
641         self.send("outsider@example.com", "foo-setlist@example.com",
642                   "From: foo\n\nnew1@example.com\nnew2@example.com\n")
643         self.match("foo-ignore@example.com", 
644                    None,
645                    "outsider@example.com", 
646                    "Subject: You can't set the subscriber list")
647         self.no_more_mail()
648
649     def testSetlistBadlist(self):
650         self.send("listmaster@example.com", "foo-setlist@example.com",
651                   "From: foo\n\nBlah blah blah.\n")
652         self.match("foo-ignore@example.com", 
653                    None,
654                    "listmaster@example.com", 
655                    "Subject: Bad address list")
656         self.no_more_mail()
657
658     def testOwnerSubscribesSomeoneElse(self):
659         # Send subscription request. List sends confirmation request.
660         self.send("listmaster@example.com",
661                   "foo-subscribe-outsider=example.com@example.com")
662         a = self.match("foo-ignore@example.com", 
663                        "foo-subyes-[^@]*@example.com", 
664                        "listmaster@example.com",
665                        "Please confirm subscription")
666         self.no_more_mail()
667         
668         # Confirm sub. req. List sends welcome.
669         self.send("listmaster@example.com", a)
670         self.match("foo-ignore@example.com", 
671                    None, 
672                    "outsider@example.com", 
673                    "Welcome to the")
674         self.no_more_mail()
675
676     def testOwnerUnubscribesSomeoneElse(self):
677         # Send unsubscription request. List sends confirmation request.
678         self.send("listmaster@example.com",
679                   "foo-unsubscribe-outsider=example.com@example.com")
680         a = self.match("foo-ignore@example.com", 
681                        "foo-unsubyes-[^@]*@example.com", 
682                        "listmaster@example.com",
683                        "Subject: Please confirm UNsubscription")
684         self.no_more_mail()
685         
686         # Confirm sub. req. List sends welcome.
687         self.send("listmaster@example.com", a)
688         self.match("foo-ignore@example.com", None, "outsider@example.com", 
689                    "Goodbye")
690         self.no_more_mail()
691
692 class SubscriptionTestCases(IncomingBase):
693
694     def confirm(self, recipient):
695         # List has sent confirmation request. Respond to it.
696         a = self.match("foo-ignore@example.com", 
697                        "foo-subyes-[^@]*@example.com", 
698                        recipient,
699                        "Please confirm subscription")
700         self.no_more_mail()
701         
702         # Confirm sub. req. List response will be analyzed later.
703         self.send("something.random@example.com", a)
704
705     def got_welcome(self, recipient):
706         self.match("foo-ignore@example.com", 
707                    None, 
708                    recipient, 
709                    "Welcome to the")
710         self.no_more_mail()
711
712     def approve(self, user_recipient):
713         self.match("foo-ignore@example.com", None, user_recipient)
714         a = self.match("foo-ignore@example.com", 
715                        "foo-subapprove-[^@]*@example.com",
716                        "listmaster@example.com")
717         self.send("listmaster@example.com", a)
718
719     def reject(self, user_recipient):
720         self.match("foo-ignore@example.com", None, user_recipient)
721         a = self.match("foo-ignore@example.com", 
722                        "foo-subreject-[^@]*@example.com",
723                        "listmaster@example.com")
724         self.send("listmaster@example.com", a)
725
726     def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
727         self.configure_list("free", "free")
728         self.send("outsider@example.com", "foo-subscribe@example.com")
729         self.confirm("outsider@example.com")
730         self.got_welcome("outsider@example.com")
731
732     def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
733         self.configure_list("free", "free")
734         self.send("user1@example.com", "foo-subscribe@example.com")
735         self.confirm("user1@example.com")
736         self.got_welcome("user1@example.com")
737
738     def testSubscribeToUnmoderatedWithAddressNotOnList(self):
739         self.configure_list("free", "free")
740         self.send("somebody.else@example.com", 
741                   "foo-subscribe-outsider=example.com@example.com")
742         self.confirm("outsider@example.com")
743         self.got_welcome("outsider@example.com")
744
745     def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
746         self.configure_list("free", "free")
747         self.send("somebody.else@example.com", 
748                   "foo-subscribe-user1=example.com@example.com")
749         self.confirm("user1@example.com")
750         self.got_welcome("user1@example.com")
751
752     def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
753         self.configure_list("moderated", "moderated")
754         self.send("outsider@example.com", "foo-subscribe@example.com")
755         self.confirm("outsider@example.com")
756         self.approve("outsider@example.com")
757         self.got_welcome("outsider@example.com")
758
759     def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
760         self.configure_list("moderated", "moderated")
761         self.send("outsider@example.com", "foo-subscribe@example.com")
762         self.confirm("outsider@example.com")
763         self.reject("outsider@example.com")
764
765     def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
766         self.configure_list("moderated", "moderated")
767         self.send("user1@example.com", "foo-subscribe@example.com")
768         self.confirm("user1@example.com")
769         self.approve("user1@example.com")
770         self.got_welcome("user1@example.com")
771
772     def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
773         self.configure_list("moderated", "moderated")
774         self.send("user1@example.com", "foo-subscribe@example.com")
775         self.confirm("user1@example.com")
776         self.reject("user1@example.com")
777
778     def testSubscribeToModeratedWithAddressNotOnListApproved(self):
779         self.configure_list("moderated", "moderated")
780         self.send("somebody.else@example.com", 
781                   "foo-subscribe-outsider=example.com@example.com")
782         self.confirm("outsider@example.com")
783         self.approve("outsider@example.com")
784         self.got_welcome("outsider@example.com")
785
786     def testSubscribeToModeratedWithAddressNotOnListRejected(self):
787         self.configure_list("moderated", "moderated")
788         self.send("somebody.else@example.com", 
789                   "foo-subscribe-outsider=example.com@example.com")
790         self.confirm("outsider@example.com")
791         self.reject("outsider@example.com")
792
793     def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
794         self.configure_list("moderated", "moderated")
795         self.send("somebody.else@example.com", 
796                   "foo-subscribe-user1=example.com@example.com")
797         self.confirm("user1@example.com")
798         self.approve("user1@example.com")
799         self.got_welcome("user1@example.com")
800
801     def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
802         self.configure_list("moderated", "moderated")
803         self.send("somebody.else@example.com", 
804                   "foo-subscribe-user1=example.com@example.com")
805         self.confirm("user1@example.com")
806         self.reject("user1@example.com")
807
808 class UnsubscriptionTestCases(IncomingBase):
809
810     def confirm(self, recipient):
811         # List has sent confirmation request. Respond to it.
812         a = self.match("foo-ignore@example.com", 
813                        "foo-unsubyes-[^@]*@example.com", 
814                        recipient,
815                        "Please confirm UNsubscription")
816         self.no_more_mail()
817         
818         # Confirm sub. req. List response will be analyzed later.
819         self.send("something.random@example.com", a)
820
821     def got_goodbye(self, recipient):
822         self.match("foo-ignore@example.com", 
823                    None, 
824                    recipient, 
825                    "Goodbye from")
826         self.no_more_mail()
827
828     def testUnsubscribeWithoutAddressNotOnList(self):
829         self.send("outsider@example.com", "foo-unsubscribe@example.com")
830         self.confirm("outsider@example.com")
831         self.got_goodbye("outsider@example.com")
832
833     def testUnsubscribeWithoutAddressOnList(self):
834         self.send("user1@example.com", "foo-unsubscribe@example.com")
835         self.confirm("user1@example.com")
836         self.got_goodbye("user1@example.com")
837
838     def testUnsubscribeWithAddressNotOnList(self):
839         self.send("somebody.else@example.com", 
840                   "foo-unsubscribe-outsider=example.com@example.com")
841         self.confirm("outsider@example.com")
842         self.got_goodbye("outsider@example.com")
843
844     def testUnsubscribeWithAddressOnList(self):
845         self.send("somebody.else@example.com", 
846                   "foo-unsubscribe-user1=example.com@example.com")
847         self.confirm("user1@example.com")
848         self.got_goodbye("user1@example.com")
849
850 class PostTestCases(IncomingBase):
851
852     msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")
853
854     def approve(self, user_recipient):
855         self.match("foo-ignore@example.com", None, user_recipient)
856         a = self.match("foo-ignore@example.com", 
857                        "foo-approve-[^@]*@example.com",
858                        "listmaster@example.com")
859         self.send("listmaster@example.com", a)
860
861     def reject(self, user_recipient):
862         self.match("foo-ignore@example.com", None, user_recipient)
863         a = self.match("foo-ignore@example.com", 
864                        "foo-reject-[^@]*@example.com",
865                        "listmaster@example.com")
866         self.send("listmaster@example.com", a)
867
868     def check_headers_are_encoded(self):
869         ok_chars = "\t\r\n"
870         for code in range(32, 127):
871             ok_chars = ok_chars + chr(code)
872         for mail in self.sent_mail:
873             text = mail["text"]
874             self.failUnless("\n\n" in text)
875             headers = text.split("\n\n")[0]
876             for c in headers:
877                 if c not in ok_chars: print headers
878                 self.failUnless(c in ok_chars)
879
880     def check_mail_to_list(self):
881         self.check_headers_are_encoded()
882         self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
883                    body="hello, world",
884                    header="X-Foo: FOO",
885                    anti_header="Received:")
886         self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
887                    body="hello, world",
888                    header="x-foo: foo",
889                    anti_header="Received:")
890         self.no_more_mail()
891
892     def check_that_moderation_box_is_empty(self):
893         ml = self.mlm.open_list("foo@example.com")
894         self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])
895
896     def testSubscriberPostsToUnmoderated(self):
897         self.configure_list("free", "free")
898         self.send("user1@example.com", "foo@example.com", 
899                   self.msg)
900         self.check_mail_to_list()
901
902     def testOutsiderPostsToUnmoderated(self):
903         self.configure_list("free", "free")
904         self.send("outsider@example.com", "foo@example.com", self.msg)
905         self.check_mail_to_list()
906
907     def testSubscriberPostToAutomoderated(self):
908         self.configure_list("free", "auto")
909         self.check_that_moderation_box_is_empty()
910         self.send("user1@example.com", "foo@example.com", self.msg)
911         self.check_mail_to_list()
912         self.check_that_moderation_box_is_empty()
913
914     def testOutsiderPostsToAutomoderatedRejected(self):
915         self.configure_list("free", "auto")
916         self.check_that_moderation_box_is_empty()
917         self.send("outsider@example.com", "foo@example.com", self.msg)
918         self.reject("outsider@example.com")
919         self.check_that_moderation_box_is_empty()
920
921     def testOutsiderPostsToAutomoderatedApproved(self):
922         self.configure_list("free", "auto")
923         self.check_that_moderation_box_is_empty()
924         self.send("outsider@example.com", "foo@example.com", self.msg)
925         self.approve("outsider@example.com")
926         self.check_mail_to_list()
927         self.check_that_moderation_box_is_empty()
928
929     def testSubscriberPostsToModeratedRejected(self):
930         self.configure_list("free", "moderated")
931         self.check_that_moderation_box_is_empty()
932         self.send("user1@example.com", "foo@example.com", self.msg)
933         self.reject("user1@example.com")
934         self.check_that_moderation_box_is_empty()
935
936     def testOutsiderPostsToMderatedApproved(self):
937         self.configure_list("free", "moderated")
938         self.check_that_moderation_box_is_empty()
939         self.send("outsider@example.com", "foo@example.com", self.msg)
940         self.approve("outsider@example.com")
941         self.check_mail_to_list()
942         self.check_that_moderation_box_is_empty()
943
944     def testSubscriberPostsWithRequestToBeModerated(self):
945         self.configure_list("free", "free")
946
947         self.check_that_moderation_box_is_empty()
948         self.send("user1@example.com", "foo@example.com", self.msg,
949                   force_moderation=1)
950         self.match("foo-ignore@example.com", 
951                    None, 
952                    "user1@example.com", 
953                    "Subject: Please wait")
954         a = self.match("foo-ignore@example.com", 
955                        "foo-approve-[^@]*@example.com", 
956                        "listmaster@example.com")
957         self.no_more_mail()
958
959         self.send("listmaster@example.com", a)
960         self.check_mail_to_list()
961         self.check_that_moderation_box_is_empty()
962
963     def testSubscriberPostsWithModerationOverride(self):
964         self.configure_list("moderated", "moderated")
965         self.send("user1@example.com", "foo@example.com", self.msg,
966                   force_posting=1)
967         self.check_mail_to_list()
968         self.check_that_moderation_box_is_empty()
969
970 class BounceTestCases(IncomingBase):
971
972     def check_subscriber_status(self, must_be):
973         ml = self.mlm.open_list("foo@example.com")
974         for id in ml.subscribers.groups():
975             self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)
976
977     def bounce_sent_mail(self):
978         for m in self.sent_mail[:]:
979             self.send("something@example.com", m["sender"], "eek")
980             self.failUnlessEqual(len(self.sent_mail), 0)
981
982     def send_mail_to_list_then_bounce_everything(self):
983         self.send("user@example.com", "foo@example.com", "hello")
984         for m in self.sent_mail[:]:
985             self.send("foo@example.com", m["sender"], "eek")
986             self.failUnlessEqual(len(self.sent_mail), 0)
987
988     def testBounceOnceThenRecover(self):
989         self.check_subscriber_status("ok")
990         self.send_mail_to_list_then_bounce_everything()
991
992         self.check_subscriber_status("bounced")
993         
994         ml = self.mlm.open_list("foo@example.com")
995         for id in ml.subscribers.groups():
996             bounce_id = ml.subscribers.get(id, "bounce-id")
997             self.failUnless(bounce_id)
998             self.failUnless(ml.bounce_box.has(bounce_id))
999
1000         bounce_ids = []
1001         now = time.time()
1002         ml = self.mlm.open_list("foo@example.com")
1003         ml.subscribers.lock()
1004         for id in ml.subscribers.groups():
1005             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
1006             self.failUnless(abs(timestamp - now) < 10.0)
1007             ml.subscribers.set(id, "timestamp-bounced", "69.0")
1008             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
1009         ml.subscribers.save()
1010
1011         self.mlm.cleaning_woman(no_op)
1012         self.check_subscriber_status("probed")
1013
1014         for bounce_id in bounce_ids:
1015             self.failUnless(ml.bounce_box.has(bounce_id))
1016
1017         self.mlm.cleaning_woman(no_op)
1018         ml = self.mlm.open_list("foo@example.com")
1019         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1020         self.check_subscriber_status("ok")
1021         for bounce_id in bounce_ids:
1022             self.failUnless(not ml.bounce_box.has(bounce_id))
1023
1024     def testBounceProbeAlso(self):
1025         self.check_subscriber_status("ok")
1026         self.send_mail_to_list_then_bounce_everything()
1027         self.check_subscriber_status("bounced")
1028         
1029         ml = self.mlm.open_list("foo@example.com")
1030         for id in ml.subscribers.groups():
1031             bounce_id = ml.subscribers.get(id, "bounce-id")
1032             self.failUnless(bounce_id)
1033             self.failUnless(ml.bounce_box.has(bounce_id))
1034
1035         bounce_ids = []
1036         now = time.time()
1037         ml = self.mlm.open_list("foo@example.com")
1038         ml.subscribers.lock()
1039         for id in ml.subscribers.groups():
1040             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
1041             self.failUnless(abs(timestamp - now) < 10.0)
1042             ml.subscribers.set(id, "timestamp-bounced", "69.0")
1043             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
1044         ml.subscribers.save()
1045
1046         self.sent_mail = []
1047         self.mlm.cleaning_woman(self.catch_sendmail)
1048         self.check_subscriber_status("probed")
1049         for bounce_id in bounce_ids:
1050             self.failUnless(ml.bounce_box.has(bounce_id))
1051         self.bounce_sent_mail()
1052         self.check_subscriber_status("probebounced")
1053
1054         self.mlm.cleaning_woman(no_op)
1055         ml = self.mlm.open_list("foo@example.com")
1056         self.failUnlessEqual(len(ml.subscribers.groups()), 0)
1057         for bounce_id in bounce_ids:
1058             self.failUnless(not ml.bounce_box.has(bounce_id))
1059
1060     def testCleaningWomanJoinsAndBounceSplitsGroups(self):
1061         # Check that each group contains one address and set the creation
1062         # timestamp to an ancient time.
1063         ml = self.mlm.open_list("foo@example.com")
1064         bouncedir = os.path.join(ml.dirname, "bounce-box")
1065         ml.subscribers.lock()
1066         for id in ml.subscribers.groups():
1067             addrs = ml.subscribers.in_group(id)
1068             self.failUnlessEqual(len(addrs), 1)
1069             bounce_id = ml.subscribers.get(id, "bounce-id")
1070             self.failUnlessEqual(bounce_id, "..notexist..")
1071             bounce_id = "bounce-" + id
1072             ml.subscribers.set(id, "bounce-id", bounce_id)
1073             bounce_path = os.path.join(bouncedir, bounce_id)
1074             self.failUnless(not os.path.isfile(bounce_path))
1075             f = open(bounce_path, "w")
1076             f.close()
1077             f = open(bounce_path + ".address", "w")
1078             f.close()
1079             self.failUnless(os.path.isfile(bounce_path))
1080             ml.subscribers.set(id, "timestamp-created", "1")
1081         ml.subscribers.save()
1082
1083         # Check that --cleaning-woman joins the two groups into one.
1084         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1085         self.mlm.cleaning_woman(no_op)
1086         ml = self.mlm.open_list("foo@example.com")
1087         self.failUnlessEqual(len(ml.subscribers.groups()), 1)
1088         self.failUnlessEqual(os.listdir(bouncedir), [])
1089         
1090         # Check that a bounce splits the single group.
1091         self.send_mail_to_list_then_bounce_everything()
1092         ml = self.mlm.open_list("foo@example.com")
1093         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1094         
1095         # Check that a --cleaning-woman immediately after doesn't join.
1096         # (The groups are new, thus shouldn't be joined for a week.)
1097         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1098         self.mlm.cleaning_woman(no_op)
1099         ml = self.mlm.open_list("foo@example.com")
1100         self.failUnlessEqual(len(ml.subscribers.groups()), 2)