add feature: configurable subject prefixing for mailing lists
[eoc.git] / eocTests.py
1 import unittest
2 import shutil
3 import os
4 import re
5 import time
6 import string
7
8 import eoc
9
10 DOTDIR = "dot-dir-for-testing"
11 eoc.DOTDIR = DOTDIR
12 eoc.quiet = 1
13
14 def no_op(*args):
15     pass
16
17 class AddressCleaningTestCases(unittest.TestCase):
18
19     def setUp(self):
20         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
21                                      "bar@lists.example.com"])
22     
23     def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
24         self.ap.set_skip_prefix(skip_prefix)
25         self.ap.set_forced_domain(forced_domain)
26         address = self.ap.clean(address)
27         self.failUnlessEqual(address, wanted)
28     
29     def testSimpleAddress(self):
30         self.verify("foo@example.com", "foo@example.com")
31
32     def testUpperCaseAddress(self):
33         self.verify("FOO@EXAMPLE.COM", "foo@example.com")
34
35     def testPrefixRemoval(self):
36         self.verify("foo@example.com", "foo@example.com", 
37                     skip_prefix="user-")
38         self.verify("user-foo@example.com", "foo@example.com", 
39                     skip_prefix="user-")
40
41     def testForcedDomain(self):
42         self.verify("foo@example.com", "foo@example.com",
43                     forced_domain="example.com")
44         self.verify("foo@whatever.example.com", "foo@example.com", 
45                     forced_domain="example.com")
46
47     def testPrefixRemovalWithForcedDomain(self):
48         self.verify("foo@example.com", "foo@example.com", 
49                     skip_prefix="user-",
50                     forced_domain="example.com")
51         self.verify("foo@whatever.example.com", "foo@example.com", 
52                     skip_prefix="user-",
53                     forced_domain="example.com")
54         self.verify("user-foo@example.com", "foo@example.com", 
55                     skip_prefix="user-",
56                     forced_domain="example.com")
57         self.verify("user-foo@whatever.example.com", "foo@example.com", 
58                     skip_prefix="user-",
59                     forced_domain="example.com")
60
61 class AddressParserTestCases(unittest.TestCase):
62
63     def setUp(self):
64         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
65                                      "bar@lists.example.com"])
66     
67     def verify_parser(self, address, wanted_listname, wanted_parts):
68         listname, parts = self.ap.parse(address)
69         self.failUnlessEqual(listname, wanted_listname)
70         self.failUnlessEqual(parts, wanted_parts)
71         
72     def testParser(self):
73         self.verify_parser("foo@example.com", 
74                            "foo@EXAMPLE.com", 
75                            [])
76         self.verify_parser("foo-subscribe@example.com", 
77                            "foo@EXAMPLE.com", 
78                            ["subscribe"])
79         self.verify_parser("foo-subscribe-joe=example.com@example.com", 
80                            "foo@EXAMPLE.com", 
81                            ["subscribe", "joe=example.com"])
82         self.verify_parser("foo-bounce-123-ABCDEF@example.com", 
83                            "foo@EXAMPLE.com", 
84                            ["bounce", "123", "abcdef"])
85
86 class ParseRecipientAddressBase(unittest.TestCase):
87
88     def setUp(self):
89         self.lists = ["foo@example.com", 
90                       "bar@lists.example.com",
91                       "foo-announce@example.com"]
92         self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
93     
94     def environ(self, sender, recipient):
95         eoc.set_environ({
96             "SENDER": sender,
97             "RECIPIENT": recipient,
98         })
99
100 class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):
101
102     def testEmpty(self):
103         self.failUnlessRaises(eoc.UnknownList,
104                               self.mlm.parse_recipient_address, 
105                               "", None, None)
106
107     def verify(self, address, skip_prefix, forced_domain, wanted_dict):
108         dict = self.mlm.parse_recipient_address(address, skip_prefix, 
109                                                 forced_domain)
110         self.failUnlessEqual(dict, wanted_dict)
111
112     def testSimpleAddresses(self):
113         self.verify("foo@example.com", 
114                     None, 
115                     None, 
116                     { "name": "foo@example.com", "command": "post" })
117         self.verify("FOO@EXAMPLE.COM", 
118                     None, 
119                     None, 
120                     { "name": "foo@example.com", "command": "post" })
121         self.verify("prefix-foo@example.com", 
122                     "prefix-", 
123                     None, 
124                     { "name": "foo@example.com", "command": "post" })
125         self.verify("bar@example.com", 
126                     None, 
127                     "lists.example.com", 
128                     { "name": "bar@lists.example.com", "command": "post" })
129         self.verify("prefix-bar@example.com", 
130                     "prefix-",
131                     "lists.example.com", 
132                     { "name": "bar@lists.example.com", "command": "post" })
133
134     def testSubscription(self):
135         self.verify("foo-subscribe@example.com", 
136                     None,
137                     None,
138                     { "name": "foo@example.com", 
139                       "command": "subscribe",
140                       "sender": "",
141                     })
142         self.verify("foo-subscribe-joe-user=example.com@example.com", 
143                     None,
144                     None,
145                     { "name": "foo@example.com", 
146                       "command": "subscribe",
147                       "sender": "joe-user@example.com",
148                     })
149         self.verify("foo-unsubscribe@example.com", 
150                     None,
151                     None,
152                     { "name": "foo@example.com", 
153                       "command": "unsubscribe",
154                       "sender": "",
155                     })
156         self.verify("foo-unsubscribe-joe-user=example.com@example.com", 
157                     None,
158                     None,
159                     { "name": "foo@example.com", 
160                       "command": "unsubscribe",
161                       "sender": "joe-user@example.com",
162                     })
163
164     def testPost(self):
165         for name in self.lists:
166             self.verify(name, None, None, { "name": name, "command": "post" })
167
168     def testSimpleCommands(self):
169         for name in self.lists:
170             for command in ["help", "list", "owner"]:
171                 localpart, domain = name.split("@")
172                 address = "%s-%s@%s" % (localpart, command, domain)
173                 self.verify(address, None, None,
174                             { "name": name,
175                               "command": command
176                             })
177
178 class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):
179
180     def try_good_signature(self, command):
181         s = "foo-announce-%s-1" % command
182         hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
183         local_part = "%s-%s" % (s, hash)
184         dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
185                                                 None, None)
186         self.failUnlessEqual(dict,
187                              {
188                                 "name": "foo-announce@example.com",
189                                 "command": command,
190                                 "id": "1",
191                              })
192
193     def testProperlySignedCommands(self):
194         self.try_good_signature("subyes")
195         self.try_good_signature("subapprove")
196         self.try_good_signature("subreject")
197         self.try_good_signature("unsubyes")
198         self.try_good_signature("bounce")
199         self.try_good_signature("approve")
200         self.try_good_signature("reject")
201         self.try_good_signature("probe")
202
203 class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):
204
205     def try_bad_signature(self, command_part):
206         self.failUnlessRaises(eoc.BadSignature,
207                               self.mlm.parse_recipient_address, 
208                               "foo-announce-" + command_part + 
209                                     "-123-badhash@example.com",
210                               None, None)
211
212     def testBadlySignedCommands(self):
213         self.try_bad_signature("subyes")
214         self.try_bad_signature("subapprove")
215         self.try_bad_signature("subreject")
216         self.try_bad_signature("unsubyes")
217         self.try_bad_signature("bounce")
218         self.try_bad_signature("approve")
219         self.try_bad_signature("reject")
220         self.try_bad_signature("probe")
221
222 class DotDirTestCases(unittest.TestCase):
223
224     def setUp(self):
225         self.secret_name = os.path.join(DOTDIR, "secret")
226
227     def tearDown(self):
228         shutil.rmtree(DOTDIR)
229
230     def dotdir_is_ok(self):
231         self.failUnless(os.path.isdir(DOTDIR))
232         self.failUnless(os.path.isfile(self.secret_name))
233
234     def testNoDotDirExists(self):
235         self.failIf(os.path.exists(DOTDIR))
236         mlm = eoc.MailingListManager(DOTDIR)
237         self.dotdir_is_ok()
238
239     def testDotDirDoesExistButSecretDoesNot(self):
240         self.failIf(os.path.exists(DOTDIR))
241         os.makedirs(DOTDIR)
242         self.failUnless(os.path.isdir(DOTDIR))
243         self.failIf(os.path.exists(self.secret_name))
244         mlm = eoc.MailingListManager(DOTDIR)
245         self.dotdir_is_ok()
246
247
248 class RemoveSomeHeadersTest(unittest.TestCase):
249
250     def testRemoveSomeHeaders(self):
251         mlm = eoc.MailingListManager(DOTDIR)
252         ml = eoc.MailingList(mlm, "list@example.com")
253         mail = """\
254 Header-1: this is a simple header
255 Header-2: this
256     is
257     a
258     complex header with a colon: yes it is
259 Header-3: odd numbered headers are simple
260
261 Body.
262 """
263         mail2 = ml.remove_some_headers(mail, ["Header-2"])
264         self.failUnlessEqual(mail2, """\
265 Header-1: this is a simple header
266 Header-3: odd numbered headers are simple
267
268 Body.
269 """)
270
271 class ListBase(unittest.TestCase):
272
273     def setUp(self):
274         if os.path.exists(DOTDIR):
275             shutil.rmtree(DOTDIR)
276         self.mlm = eoc.MailingListManager(DOTDIR)
277
278     def tearDown(self):
279         self.mlm = None
280         shutil.rmtree(DOTDIR)
281
282 class ListCreationTestCases(ListBase):
283
284     def setUp(self):
285         ListBase.setUp(self)
286         self.names = None
287
288     def listdir(self, listname):
289         return os.path.join(DOTDIR, listname)
290
291     def listdir_has_file(self, listdir, filename):
292         self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
293         self.names.remove(filename)
294
295     def listdir_has_dir(self, listdir, dirname):
296         self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
297         self.names.remove(dirname)
298
299     def listdir_may_have_dir(self, listdir, dirname):
300         if dirname in self.names:
301             self.listdir_has_dir(listdir, dirname)
302
303     def listdir_is_ok(self, listname):
304         listdir = self.listdir(listname)
305         self.failUnless(os.path.isdir(listdir))
306         self.names = os.listdir(listdir)
307         
308         self.listdir_has_file(listdir, "config")
309         self.listdir_has_file(listdir, "subscribers")
310             
311         self.listdir_has_dir(listdir, "bounce-box")
312         self.listdir_has_dir(listdir, "subscription-box")
313             
314         self.listdir_may_have_dir(listdir, "moderation-box")
315         self.listdir_may_have_dir(listdir, "templates")
316             
317         # Make sure there are no extras.
318         self.failUnlessEqual(self.names, [])
319
320     def testCreateNew(self):
321         self.failIf(os.path.exists(self.listdir("foo@example.com")))
322         ml = self.mlm.create_list("foo@example.com")
323
324         self.failUnlessEqual(ml.__class__, eoc.MailingList)
325         self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))
326
327         self.listdir_is_ok("foo@example.com")
328
329     def testCreateExisting(self):
330         list = self.mlm.create_list("foo@example.com")
331         self.failUnlessRaises(eoc.ListExists,
332                               self.mlm.create_list, "foo@example.com")
333         self.listdir_is_ok("foo@example.com")
334
335 class ListOptionTestCases(ListBase):
336
337     def check(self, ml, wanted):
338         self.failUnlessEqual(ml.cp.sections(), ["list"])
339         cpdict = {}
340         for key, value in ml.cp.items("list"):
341             cpdict[key] = value
342         self.failUnlessEqual(cpdict, wanted)
343
344     def testDefaultOptionsOnCreateAndOpenExisting(self):
345         self.mlm.create_list("foo@example.com")
346         ml = self.mlm.open_list("foo@example.com")
347         self.check(ml,
348                    {
349                       "owners": "",
350                       "moderators": "",
351                       "subscription": "free",
352                       "posting": "free",
353                       "archived": "no",
354                       "mail-on-subscription-changes": "no",
355                       "mail-on-forced-unsubscribe": "no",
356                       "ignore-bounce": "no",
357                       "language": "",
358                       "pristine-headers": "",
359                       "subject-prefix": "",
360                    })
361
362     def testChangeOptions(self):
363         # Create a list, change some options, and save the result.
364         ml = self.mlm.create_list("foo@example.com")
365         self.failUnlessEqual(ml.cp.get("list", "owners"), "")
366         self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
367         ml.cp.set("list", "owners", "owner@example.com")
368         ml.cp.set("list", "posting", "moderated")
369         ml.save_config()
370         
371         # Re-open the list and check that the new instance has read the
372         # values from the disk correctly.
373         ml2 = self.mlm.open_list("foo@example.com")
374         self.check(ml2,
375                    {
376                       "owners": "owner@example.com",
377                       "moderators": "",
378                       "subscription": "free",
379                       "posting": "moderated",
380                       "archived": "no",
381                       "mail-on-subscription-changes": "no",
382                       "mail-on-forced-unsubscribe": "no",
383                       "ignore-bounce": "no",
384                       "language": "",
385                       "pristine-headers": "",
386                       "subject-prefix": "",
387                    })
388
389 class SubscriberDatabaseTestCases(ListBase):
390
391     def has_subscribers(self, ml, addrs):
392         subs = ml.subscribers.get_all()
393         subs.sort()
394         self.failUnlessEqual(subs, addrs)
395
396     def testAddAndRemoveSubscribers(self):
397         addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
398         addrs.sort()
399     
400         ml = self.mlm.create_list("foo@example.com")
401         self.failUnlessEqual(ml.subscribers.get_all(), [])
402
403         self.failUnless(ml.subscribers.lock())
404         ml.subscribers.add_many(addrs)
405         self.has_subscribers(ml, addrs)
406         ml.subscribers.save()
407         self.failIf(ml.subscribers.locked)
408         ml = None
409
410         ml2 = self.mlm.open_list("foo@example.com")
411         self.has_subscribers(ml2, addrs)
412         ml2.subscribers.lock()
413         ml2.subscribers.remove(addrs[0])
414         self.has_subscribers(ml2, addrs[1:])
415         
416         ml2.subscribers.save()
417         
418         ml3 = self.mlm.open_list("foo@example.com")
419         self.has_subscribers(ml3, addrs[1:])
420
421     def testSubscribeTwice(self):
422         ml = self.mlm.create_list("foo@example.com")
423         self.failUnlessEqual(ml.subscribers.get_all(), [])
424         ml.subscribers.lock()
425         ml.subscribers.add("user@example.com")
426         ml.subscribers.add("USER@example.com")
427         self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
428                              map(string.lower, ["USER@example.com"]))
429
430     def testSubscriberAttributesAndGroups(self):
431         addrs = ["joe@example.com", "mary@example.com"]
432         addrs.sort()
433         ml = self.mlm.create_list("foo@example.com")
434         self.failUnlessEqual(ml.subscribers.groups(), [])
435         ml.subscribers.lock()
436         id = ml.subscribers.add_many(addrs)
437         self.failUnlessEqual(ml.subscribers.groups(), ["0"])
438         self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
439         ml.subscribers.set(id, "status", "bounced")
440         self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
441         subs = ml.subscribers.in_group(id)
442         subs.sort()
443         self.failUnlessEqual(subs, addrs)
444
445     def testSubjectPrefix(self):
446         ml = self.mlm.create_list("prefix@example.com")
447         ml.cp.set("list", "subject-prefix", "[test]")
448         ml.save_config()
449
450         self.failUnlessEqual(ml.cp.get("list", "subject-prefix"), "[test]")
451
452         mail = """\
453 To: test@example.com
454 From: test2@example.com
455 Subject: testing whether the subject prefix works
456 Precedence: bulk
457
458 Body.
459 """
460         prefixed_mail = ml.add_subject_prefix(mail)
461
462         self.failUnlessEqual(prefixed_mail, """\
463 To: test@example.com
464 From: test2@example.com
465 Subject: [test] testing whether the subject prefix works
466 Precedence: bulk
467
468 Body.
469 """)
470
471
472 class ModerationBoxTestCases(ListBase):
473
474     def testModerationBox(self):
475         ml = self.mlm.create_list("foo@example.com")
476         listdir = os.path.join(DOTDIR, "foo@example.com")
477         boxdir = os.path.join(listdir, "moderation-box")
478
479         self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
480         self.failUnless(os.path.isdir(boxdir))
481
482         mailtext = "From: foo\nTo: bar\n\nhello\n"
483         id = ml.moderation_box.add("foo", mailtext)
484         self.failUnless(ml.moderation_box.has(id))
485         self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
486         self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
487         
488         filename = os.path.join(boxdir, id)
489         self.failUnless(os.path.isfile(filename))
490         self.failUnless(os.path.isfile(filename + ".address"))
491         
492         ml.moderation_box.remove(id)
493         self.failIf(ml.moderation_box.has(id))
494         self.failUnless(not os.path.exists(filename))
495
496 class IncomingBase(unittest.TestCase):
497
498     def setUp(self):
499         if os.path.isdir(DOTDIR):
500             shutil.rmtree(DOTDIR)
501         self.mlm = eoc.MailingListManager(DOTDIR)
502         self.ml = None
503         ml = self.mlm.create_list("foo@EXAMPLE.com")
504         ml.cp.set("list", "owners", "listmaster@example.com")
505         ml.save_config()
506         ml.subscribers.lock()
507         ml.subscribers.add("USER1@example.com")
508         ml.subscribers.add("user2@EXAMPLE.com")
509         ml.subscribers.save()
510         self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
511         self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
512         self.sent_mail = []
513
514     def tearDown(self):
515         shutil.rmtree(DOTDIR)
516
517     def write_file_in_listdir(self, ml, basename, contents):
518         f = open(os.path.join(ml.dirname, basename), "w")
519         f.write(contents)
520         f.close()
521
522     def configure_list(self, subscription, posting):
523         list = self.mlm.open_list("foo@example.com")
524         list.cp.set("list", "subscription", subscription)
525         list.cp.set("list", "posting", posting)
526         list.save_config()
527
528     def environ(self, sender, recipient):
529         eoc.set_environ({
530             "SENDER": sender,
531             "RECIPIENT": recipient,
532         })
533
534     def catch_sendmail(self, sender, recipients, text):
535         self.sent_mail.append({
536             "sender": sender,
537             "recipients": recipients,
538             "text": text,
539         })
540
541     def send(self, sender, recipient, text="", force_moderation=0, 
542              force_posting=0):
543         self.environ(sender, recipient)
544         dict = self.mlm.parse_recipient_address(recipient, None, None)
545         dict["force-moderation"] = force_moderation
546         dict["force-posting"] = force_posting
547         self.ml = self.mlm.open_list(dict["name"])
548         if "\n\n" not in text:
549             text = "\n\n" + text
550         text = "Received: foobar\n" + text
551         self.ml.read_stdin = lambda t=text: t
552         self.mlm.send_mail = self.catch_sendmail
553         self.sent_mail = []
554         self.ml.obey(dict)
555
556     def sender_matches(self, mail, sender):
557         pat = "(?P<address>" + sender + ")"
558         m = re.match(pat, mail["sender"], re.I)
559         if m:
560             return m.group("address")
561         else:
562             return None
563         
564     def replyto_matches(self, mail, replyto):
565         pat = "(.|\n)*(?P<address>" + replyto + ")"
566         m = re.match(pat, mail["text"], re.I)
567         if m:
568             return m.group("address")
569         else:
570             return None
571
572     def receiver_matches(self, mail, recipient):
573         return map(string.lower, mail["recipients"]) == [recipient.lower()]
574
575     def body_matches(self, mail, body):
576         if body:
577             pat = re.compile("(.|\n)*" + body + "(.|\n)*")
578             m = re.match(pat, mail["text"])
579             return m
580         else:
581             return 1
582
583     def headers_match(self, mail, header):
584         if header:
585             pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
586             m = re.match(pat, mail["text"])
587             return m
588         else:
589             return 1
590
591     def match(self, sender, replyto, receiver, body=None, header=None,
592               anti_header=None):
593         ret = None
594         for mail in self.sent_mail:
595             if replyto is None:
596                 m1 = self.sender_matches(mail, sender)
597                 m3 = self.receiver_matches(mail, receiver)
598                 m4 = self.body_matches(mail, body)
599                 m5 = self.headers_match(mail, header)
600                 m6 = self.headers_match(mail, anti_header)
601                 no_anti_header = anti_header == None or m6 == None
602                 if m1 != None and m3 and m4 and m5 and no_anti_header:
603                     ret = m1
604                     self.sent_mail.remove(mail)
605                     break
606             else:
607                 m1 = self.sender_matches(mail, sender)
608                 m2 = self.replyto_matches(mail, replyto)
609                 m3 = self.receiver_matches(mail, receiver)
610                 m4 = self.body_matches(mail, body)
611                 m5 = self.headers_match(mail, header)
612                 m6 = self.headers_match(mail, anti_header)
613                 no_anti_header = anti_header == None or m6 == None
614                 if m1 != None and m2 != None and m3 and m4 and m5 and \
615                    no_anti_header:
616                     ret = m2
617                     self.sent_mail.remove(mail)
618                     break
619         self.failUnless(ret != None)
620         return ret
621
622     def no_more_mail(self):
623         self.failUnlessEqual(self.sent_mail, [])
624
625
626 class SimpleCommandAddressTestCases(IncomingBase):
627
628     def testHelp(self):
629         self.send("outsider@example.com", "foo-help@example.com")
630         self.match("foo-ignore@example.com", None, "outsider@example.com", 
631                    "Subject: Help for")
632         self.no_more_mail()
633
634     def testOwner(self):
635         self.send("outsider@example.com", "foo-owner@example.com", "abcde")
636         self.match("outsider@example.com", None, "listmaster@example.com",
637                    "abcde")
638         self.no_more_mail()
639
640     def testIgnore(self):
641         self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
642         self.no_more_mail()
643
644 class OwnerCommandTestCases(IncomingBase):
645
646     def testList(self):
647         self.send("listmaster@example.com", "foo-list@example.com")
648         self.match("foo-ignore@example.com", None, "listmaster@example.com",
649                    "[uU][sS][eE][rR][12]@" +
650                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
651                    "[uU][sS][eE][rR][12]@" +
652                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
653         self.no_more_mail()
654
655     def testListDenied(self):
656         self.send("outsider@example.com", "foo-list@example.com")
657         self.match("foo-ignore@example.com", None, "outsider@example.com", 
658                    "Subject: Subscriber list denied")
659         self.no_more_mail()
660
661     def testSetlist(self):
662         self.send("listmaster@example.com", "foo-setlist@example.com",
663                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
664         a = self.match("foo-ignore@example.com", 
665                        "foo-setlistyes-[^@]*@example.com", 
666                        "listmaster@example.com", 
667                        "Subject: Please moderate subscriber list")
668         self.no_more_mail()
669         
670         self.send("listmaster@example.com", a)
671         self.match("foo-ignore@example.com", None, "listmaster@example.com",
672                    "Subject: Subscriber list has been changed")
673         self.match("foo-ignore@example.com", None, "new1@example.com",
674                    "Subject: Welcome to")
675         self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
676                    "Subject: Goodbye from")
677         self.no_more_mail()
678
679     def testSetlistSilently(self):
680         self.send("listmaster@example.com", "foo-setlistsilently@example.com",
681                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
682         a = self.match("foo-ignore@example.com", 
683                        "foo-setlistsilentyes-[^@]*@example.com", 
684                        "listmaster@example.com", 
685                        "Subject: Please moderate subscriber list")
686         self.no_more_mail()
687         
688         self.send("listmaster@example.com", a)
689         self.match("foo-ignore@example.com", None, "listmaster@example.com",
690                    "Subject: Subscriber list has been changed")
691         self.no_more_mail()
692
693     def testSetlistDenied(self):
694         self.send("outsider@example.com", "foo-setlist@example.com",
695                   "From: foo\n\nnew1@example.com\nnew2@example.com\n")
696         self.match("foo-ignore@example.com", 
697                    None,
698                    "outsider@example.com", 
699                    "Subject: You can't set the subscriber list")
700         self.no_more_mail()
701
702     def testSetlistBadlist(self):
703         self.send("listmaster@example.com", "foo-setlist@example.com",
704                   "From: foo\n\nBlah blah blah.\n")
705         self.match("foo-ignore@example.com", 
706                    None,
707                    "listmaster@example.com", 
708                    "Subject: Bad address list")
709         self.no_more_mail()
710
711     def testOwnerSubscribesSomeoneElse(self):
712         # Send subscription request. List sends confirmation request.
713         self.send("listmaster@example.com",
714                   "foo-subscribe-outsider=example.com@example.com")
715         a = self.match("foo-ignore@example.com", 
716                        "foo-subyes-[^@]*@example.com", 
717                        "listmaster@example.com",
718                        "Please confirm subscription")
719         self.no_more_mail()
720         
721         # Confirm sub. req. List sends welcome.
722         self.send("listmaster@example.com", a)
723         self.match("foo-ignore@example.com", 
724                    None, 
725                    "outsider@example.com", 
726                    "Welcome to the")
727         self.no_more_mail()
728
729     def testOwnerUnubscribesSomeoneElse(self):
730         # Send unsubscription request. List sends confirmation request.
731         self.send("listmaster@example.com",
732                   "foo-unsubscribe-outsider=example.com@example.com")
733         a = self.match("foo-ignore@example.com", 
734                        "foo-unsubyes-[^@]*@example.com", 
735                        "listmaster@example.com",
736                        "Subject: Please confirm UNsubscription")
737         self.no_more_mail()
738         
739         # Confirm sub. req. List sends welcome.
740         self.send("listmaster@example.com", a)
741         self.match("foo-ignore@example.com", None, "outsider@example.com", 
742                    "Goodbye")
743         self.no_more_mail()
744
745 class SubscriptionTestCases(IncomingBase):
746
747     def confirm(self, recipient):
748         # List has sent confirmation request. Respond to it.
749         a = self.match("foo-ignore@example.com", 
750                        "foo-subyes-[^@]*@example.com", 
751                        recipient,
752                        "Please confirm subscription")
753         self.no_more_mail()
754         
755         # Confirm sub. req. List response will be analyzed later.
756         self.send("something.random@example.com", a)
757
758     def got_welcome(self, recipient):
759         self.match("foo-ignore@example.com", 
760                    None, 
761                    recipient, 
762                    "Welcome to the")
763         self.no_more_mail()
764
765     def approve(self, user_recipient):
766         self.match("foo-ignore@example.com", None, user_recipient)
767         a = self.match("foo-ignore@example.com", 
768                        "foo-subapprove-[^@]*@example.com",
769                        "listmaster@example.com")
770         self.send("listmaster@example.com", a)
771
772     def reject(self, user_recipient):
773         self.match("foo-ignore@example.com", None, user_recipient)
774         a = self.match("foo-ignore@example.com", 
775                        "foo-subreject-[^@]*@example.com",
776                        "listmaster@example.com")
777         self.send("listmaster@example.com", a)
778
779     def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
780         self.configure_list("free", "free")
781         self.send("outsider@example.com", "foo-subscribe@example.com")
782         self.confirm("outsider@example.com")
783         self.got_welcome("outsider@example.com")
784
785     def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
786         self.configure_list("free", "free")
787         self.send("user1@example.com", "foo-subscribe@example.com")
788         self.confirm("user1@example.com")
789         self.got_welcome("user1@example.com")
790
791     def testSubscribeToUnmoderatedWithAddressNotOnList(self):
792         self.configure_list("free", "free")
793         self.send("somebody.else@example.com", 
794                   "foo-subscribe-outsider=example.com@example.com")
795         self.confirm("outsider@example.com")
796         self.got_welcome("outsider@example.com")
797
798     def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
799         self.configure_list("free", "free")
800         self.send("somebody.else@example.com", 
801                   "foo-subscribe-user1=example.com@example.com")
802         self.confirm("user1@example.com")
803         self.got_welcome("user1@example.com")
804
805     def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
806         self.configure_list("moderated", "moderated")
807         self.send("outsider@example.com", "foo-subscribe@example.com")
808         self.confirm("outsider@example.com")
809         self.approve("outsider@example.com")
810         self.got_welcome("outsider@example.com")
811
812     def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
813         self.configure_list("moderated", "moderated")
814         self.send("outsider@example.com", "foo-subscribe@example.com")
815         self.confirm("outsider@example.com")
816         self.reject("outsider@example.com")
817
818     def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
819         self.configure_list("moderated", "moderated")
820         self.send("user1@example.com", "foo-subscribe@example.com")
821         self.confirm("user1@example.com")
822         self.approve("user1@example.com")
823         self.got_welcome("user1@example.com")
824
825     def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
826         self.configure_list("moderated", "moderated")
827         self.send("user1@example.com", "foo-subscribe@example.com")
828         self.confirm("user1@example.com")
829         self.reject("user1@example.com")
830
831     def testSubscribeToModeratedWithAddressNotOnListApproved(self):
832         self.configure_list("moderated", "moderated")
833         self.send("somebody.else@example.com", 
834                   "foo-subscribe-outsider=example.com@example.com")
835         self.confirm("outsider@example.com")
836         self.approve("outsider@example.com")
837         self.got_welcome("outsider@example.com")
838
839     def testSubscribeToModeratedWithAddressNotOnListRejected(self):
840         self.configure_list("moderated", "moderated")
841         self.send("somebody.else@example.com", 
842                   "foo-subscribe-outsider=example.com@example.com")
843         self.confirm("outsider@example.com")
844         self.reject("outsider@example.com")
845
846     def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
847         self.configure_list("moderated", "moderated")
848         self.send("somebody.else@example.com", 
849                   "foo-subscribe-user1=example.com@example.com")
850         self.confirm("user1@example.com")
851         self.approve("user1@example.com")
852         self.got_welcome("user1@example.com")
853
854     def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
855         self.configure_list("moderated", "moderated")
856         self.send("somebody.else@example.com", 
857                   "foo-subscribe-user1=example.com@example.com")
858         self.confirm("user1@example.com")
859         self.reject("user1@example.com")
860
861 class UnsubscriptionTestCases(IncomingBase):
862
863     def confirm(self, recipient):
864         # List has sent confirmation request. Respond to it.
865         a = self.match("foo-ignore@example.com", 
866                        "foo-unsubyes-[^@]*@example.com", 
867                        recipient,
868                        "Please confirm UNsubscription")
869         self.no_more_mail()
870         
871         # Confirm sub. req. List response will be analyzed later.
872         self.send("something.random@example.com", a)
873
874     def got_goodbye(self, recipient):
875         self.match("foo-ignore@example.com", 
876                    None, 
877                    recipient, 
878                    "Goodbye from")
879         self.no_more_mail()
880
881     def testUnsubscribeWithoutAddressNotOnList(self):
882         self.send("outsider@example.com", "foo-unsubscribe@example.com")
883         self.confirm("outsider@example.com")
884         self.got_goodbye("outsider@example.com")
885
886     def testUnsubscribeWithoutAddressOnList(self):
887         self.send("user1@example.com", "foo-unsubscribe@example.com")
888         self.confirm("user1@example.com")
889         self.got_goodbye("user1@example.com")
890
891     def testUnsubscribeWithAddressNotOnList(self):
892         self.send("somebody.else@example.com", 
893                   "foo-unsubscribe-outsider=example.com@example.com")
894         self.confirm("outsider@example.com")
895         self.got_goodbye("outsider@example.com")
896
897     def testUnsubscribeWithAddressOnList(self):
898         self.send("somebody.else@example.com", 
899                   "foo-unsubscribe-user1=example.com@example.com")
900         self.confirm("user1@example.com")
901         self.got_goodbye("user1@example.com")
902
903 class PostTestCases(IncomingBase):
904
905     msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")
906
907     def approve(self, user_recipient):
908         self.match("foo-ignore@example.com", None, user_recipient)
909         a = self.match("foo-ignore@example.com", 
910                        "foo-approve-[^@]*@example.com",
911                        "listmaster@example.com")
912         self.send("listmaster@example.com", a)
913
914     def reject(self, user_recipient):
915         self.match("foo-ignore@example.com", None, user_recipient)
916         a = self.match("foo-ignore@example.com", 
917                        "foo-reject-[^@]*@example.com",
918                        "listmaster@example.com")
919         self.send("listmaster@example.com", a)
920
921     def check_headers_are_encoded(self):
922         ok_chars = "\t\r\n"
923         for code in range(32, 127):
924             ok_chars = ok_chars + chr(code)
925         for mail in self.sent_mail:
926             text = mail["text"]
927             self.failUnless("\n\n" in text)
928             headers = text.split("\n\n")[0]
929             for c in headers:
930                 if c not in ok_chars: print headers
931                 self.failUnless(c in ok_chars)
932
933     def check_mail_to_list(self):
934         self.check_headers_are_encoded()
935         self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
936                    body="hello, world",
937                    header="X-Foo: FOO",
938                    anti_header="Received:")
939         self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
940                    body="hello, world",
941                    header="x-foo: foo",
942                    anti_header="Received:")
943         self.no_more_mail()
944
945     def check_that_moderation_box_is_empty(self):
946         ml = self.mlm.open_list("foo@example.com")
947         self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])
948
949     def testSubscriberPostsToUnmoderated(self):
950         self.configure_list("free", "free")
951         self.send("user1@example.com", "foo@example.com", 
952                   self.msg)
953         self.check_mail_to_list()
954
955     def testOutsiderPostsToUnmoderated(self):
956         self.configure_list("free", "free")
957         self.send("outsider@example.com", "foo@example.com", self.msg)
958         self.check_mail_to_list()
959
960     def testSubscriberPostToAutomoderated(self):
961         self.configure_list("free", "auto")
962         self.check_that_moderation_box_is_empty()
963         self.send("user1@example.com", "foo@example.com", self.msg)
964         self.check_mail_to_list()
965         self.check_that_moderation_box_is_empty()
966
967     def testOutsiderPostsToAutomoderatedRejected(self):
968         self.configure_list("free", "auto")
969         self.check_that_moderation_box_is_empty()
970         self.send("outsider@example.com", "foo@example.com", self.msg)
971         self.reject("outsider@example.com")
972         self.check_that_moderation_box_is_empty()
973
974     def testOutsiderPostsToAutomoderatedApproved(self):
975         self.configure_list("free", "auto")
976         self.check_that_moderation_box_is_empty()
977         self.send("outsider@example.com", "foo@example.com", self.msg)
978         self.approve("outsider@example.com")
979         self.check_mail_to_list()
980         self.check_that_moderation_box_is_empty()
981
982     def testSubscriberPostsToModeratedRejected(self):
983         self.configure_list("free", "moderated")
984         self.check_that_moderation_box_is_empty()
985         self.send("user1@example.com", "foo@example.com", self.msg)
986         self.reject("user1@example.com")
987         self.check_that_moderation_box_is_empty()
988
989     def testOutsiderPostsToMderatedApproved(self):
990         self.configure_list("free", "moderated")
991         self.check_that_moderation_box_is_empty()
992         self.send("outsider@example.com", "foo@example.com", self.msg)
993         self.approve("outsider@example.com")
994         self.check_mail_to_list()
995         self.check_that_moderation_box_is_empty()
996
997     def testSubscriberPostsWithRequestToBeModerated(self):
998         self.configure_list("free", "free")
999
1000         self.check_that_moderation_box_is_empty()
1001         self.send("user1@example.com", "foo@example.com", self.msg,
1002                   force_moderation=1)
1003         self.match("foo-ignore@example.com", 
1004                    None, 
1005                    "user1@example.com", 
1006                    "Subject: Please wait")
1007         a = self.match("foo-ignore@example.com", 
1008                        "foo-approve-[^@]*@example.com", 
1009                        "listmaster@example.com")
1010         self.no_more_mail()
1011
1012         self.send("listmaster@example.com", a)
1013         self.check_mail_to_list()
1014         self.check_that_moderation_box_is_empty()
1015
1016     def testSubscriberPostsWithModerationOverride(self):
1017         self.configure_list("moderated", "moderated")
1018         self.send("user1@example.com", "foo@example.com", self.msg,
1019                   force_posting=1)
1020         self.check_mail_to_list()
1021         self.check_that_moderation_box_is_empty()
1022
1023 class BounceTestCases(IncomingBase):
1024
1025     def check_subscriber_status(self, must_be):
1026         ml = self.mlm.open_list("foo@example.com")
1027         for id in ml.subscribers.groups():
1028             self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)
1029
1030     def bounce_sent_mail(self):
1031         for m in self.sent_mail[:]:
1032             self.send("something@example.com", m["sender"], "eek")
1033             self.failUnlessEqual(len(self.sent_mail), 0)
1034
1035     def send_mail_to_list_then_bounce_everything(self):
1036         self.send("user@example.com", "foo@example.com", "hello")
1037         for m in self.sent_mail[:]:
1038             self.send("foo@example.com", m["sender"], "eek")
1039             self.failUnlessEqual(len(self.sent_mail), 0)
1040
1041     def testBounceOnceThenRecover(self):
1042         self.check_subscriber_status("ok")
1043         self.send_mail_to_list_then_bounce_everything()
1044
1045         self.check_subscriber_status("bounced")
1046         
1047         ml = self.mlm.open_list("foo@example.com")
1048         for id in ml.subscribers.groups():
1049             bounce_id = ml.subscribers.get(id, "bounce-id")
1050             self.failUnless(bounce_id)
1051             self.failUnless(ml.bounce_box.has(bounce_id))
1052
1053         bounce_ids = []
1054         now = time.time()
1055         ml = self.mlm.open_list("foo@example.com")
1056         ml.subscribers.lock()
1057         for id in ml.subscribers.groups():
1058             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
1059             self.failUnless(abs(timestamp - now) < 10.0)
1060             ml.subscribers.set(id, "timestamp-bounced", "69.0")
1061             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
1062         ml.subscribers.save()
1063
1064         self.mlm.cleaning_woman(no_op)
1065         self.check_subscriber_status("probed")
1066
1067         for bounce_id in bounce_ids:
1068             self.failUnless(ml.bounce_box.has(bounce_id))
1069
1070         self.mlm.cleaning_woman(no_op)
1071         ml = self.mlm.open_list("foo@example.com")
1072         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1073         self.check_subscriber_status("ok")
1074         for bounce_id in bounce_ids:
1075             self.failUnless(not ml.bounce_box.has(bounce_id))
1076
1077     def testBounceProbeAlso(self):
1078         self.check_subscriber_status("ok")
1079         self.send_mail_to_list_then_bounce_everything()
1080         self.check_subscriber_status("bounced")
1081         
1082         ml = self.mlm.open_list("foo@example.com")
1083         for id in ml.subscribers.groups():
1084             bounce_id = ml.subscribers.get(id, "bounce-id")
1085             self.failUnless(bounce_id)
1086             self.failUnless(ml.bounce_box.has(bounce_id))
1087
1088         bounce_ids = []
1089         now = time.time()
1090         ml = self.mlm.open_list("foo@example.com")
1091         ml.subscribers.lock()
1092         for id in ml.subscribers.groups():
1093             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
1094             self.failUnless(abs(timestamp - now) < 10.0)
1095             ml.subscribers.set(id, "timestamp-bounced", "69.0")
1096             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
1097         ml.subscribers.save()
1098
1099         self.sent_mail = []
1100         self.mlm.cleaning_woman(self.catch_sendmail)
1101         self.check_subscriber_status("probed")
1102         for bounce_id in bounce_ids:
1103             self.failUnless(ml.bounce_box.has(bounce_id))
1104         self.bounce_sent_mail()
1105         self.check_subscriber_status("probebounced")
1106
1107         self.mlm.cleaning_woman(no_op)
1108         ml = self.mlm.open_list("foo@example.com")
1109         self.failUnlessEqual(len(ml.subscribers.groups()), 0)
1110         for bounce_id in bounce_ids:
1111             self.failUnless(not ml.bounce_box.has(bounce_id))
1112
1113     def testCleaningWomanJoinsAndBounceSplitsGroups(self):
1114         # Check that each group contains one address and set the creation
1115         # timestamp to an ancient time.
1116         ml = self.mlm.open_list("foo@example.com")
1117         bouncedir = os.path.join(ml.dirname, "bounce-box")
1118         ml.subscribers.lock()
1119         for id in ml.subscribers.groups():
1120             addrs = ml.subscribers.in_group(id)
1121             self.failUnlessEqual(len(addrs), 1)
1122             bounce_id = ml.subscribers.get(id, "bounce-id")
1123             self.failUnlessEqual(bounce_id, "..notexist..")
1124             bounce_id = "bounce-" + id
1125             ml.subscribers.set(id, "bounce-id", bounce_id)
1126             bounce_path = os.path.join(bouncedir, bounce_id)
1127             self.failUnless(not os.path.isfile(bounce_path))
1128             f = open(bounce_path, "w")
1129             f.close()
1130             f = open(bounce_path + ".address", "w")
1131             f.close()
1132             self.failUnless(os.path.isfile(bounce_path))
1133             ml.subscribers.set(id, "timestamp-created", "1")
1134         ml.subscribers.save()
1135
1136         # Check that --cleaning-woman joins the two groups into one.
1137         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1138         self.mlm.cleaning_woman(no_op)
1139         ml = self.mlm.open_list("foo@example.com")
1140         self.failUnlessEqual(len(ml.subscribers.groups()), 1)
1141         self.failUnlessEqual(os.listdir(bouncedir), [])
1142         
1143         # Check that a bounce splits the single group.
1144         self.send_mail_to_list_then_bounce_everything()
1145         ml = self.mlm.open_list("foo@example.com")
1146         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1147         
1148         # Check that a --cleaning-woman immediately after doesn't join.
1149         # (The groups are new, thus shouldn't be joined for a week.)
1150         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1151         self.mlm.cleaning_woman(no_op)
1152         ml = self.mlm.open_list("foo@example.com")
1153         self.failUnlessEqual(len(ml.subscribers.groups()), 2)