]> git.sommitrealweird.co.uk Git - eoc.git/blob - eocTests.py
Handle folder headers with colons correctly. Based on patch from Johannes Berg.
[eoc.git] / eocTests.py
1 import unittest
2 import shutil
3 import os
4 import re
5 import time
6 import string
7
8 import eoc
9
10 DOTDIR = "dot-dir-for-testing"
11 eoc.DOTDIR = DOTDIR
12 eoc.quiet = 1
13
14 def no_op(*args):
15     pass
16
17 class AddressCleaningTestCases(unittest.TestCase):
18
19     def setUp(self):
20         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
21                                      "bar@lists.example.com"])
22     
23     def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
24         self.ap.set_skip_prefix(skip_prefix)
25         self.ap.set_forced_domain(forced_domain)
26         address = self.ap.clean(address)
27         self.failUnlessEqual(address, wanted)
28     
29     def testSimpleAddress(self):
30         self.verify("foo@example.com", "foo@example.com")
31
32     def testUpperCaseAddress(self):
33         self.verify("FOO@EXAMPLE.COM", "foo@example.com")
34
35     def testPrefixRemoval(self):
36         self.verify("foo@example.com", "foo@example.com", 
37                     skip_prefix="user-")
38         self.verify("user-foo@example.com", "foo@example.com", 
39                     skip_prefix="user-")
40
41     def testForcedDomain(self):
42         self.verify("foo@example.com", "foo@example.com",
43                     forced_domain="example.com")
44         self.verify("foo@whatever.example.com", "foo@example.com", 
45                     forced_domain="example.com")
46
47     def testPrefixRemovalWithForcedDomain(self):
48         self.verify("foo@example.com", "foo@example.com", 
49                     skip_prefix="user-",
50                     forced_domain="example.com")
51         self.verify("foo@whatever.example.com", "foo@example.com", 
52                     skip_prefix="user-",
53                     forced_domain="example.com")
54         self.verify("user-foo@example.com", "foo@example.com", 
55                     skip_prefix="user-",
56                     forced_domain="example.com")
57         self.verify("user-foo@whatever.example.com", "foo@example.com", 
58                     skip_prefix="user-",
59                     forced_domain="example.com")
60
61 class AddressParserTestCases(unittest.TestCase):
62
63     def setUp(self):
64         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
65                                      "bar@lists.example.com"])
66     
67     def verify_parser(self, address, wanted_listname, wanted_parts):
68         listname, parts = self.ap.parse(address)
69         self.failUnlessEqual(listname, wanted_listname)
70         self.failUnlessEqual(parts, wanted_parts)
71         
72     def testParser(self):
73         self.verify_parser("foo@example.com", 
74                            "foo@EXAMPLE.com", 
75                            [])
76         self.verify_parser("foo-subscribe@example.com", 
77                            "foo@EXAMPLE.com", 
78                            ["subscribe"])
79         self.verify_parser("foo-subscribe-joe=example.com@example.com", 
80                            "foo@EXAMPLE.com", 
81                            ["subscribe", "joe=example.com"])
82         self.verify_parser("foo-bounce-123-ABCDEF@example.com", 
83                            "foo@EXAMPLE.com", 
84                            ["bounce", "123", "abcdef"])
85
86 class ParseRecipientAddressBase(unittest.TestCase):
87
88     def setUp(self):
89         self.lists = ["foo@example.com", 
90                       "bar@lists.example.com",
91                       "foo-announce@example.com"]
92         self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
93     
94     def environ(self, sender, recipient):
95         eoc.set_environ({
96             "SENDER": sender,
97             "RECIPIENT": recipient,
98         })
99
100 class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):
101
102     def testEmpty(self):
103         self.failUnlessRaises(eoc.UnknownList,
104                               self.mlm.parse_recipient_address, 
105                               "", None, None)
106
107     def verify(self, address, skip_prefix, forced_domain, wanted_dict):
108         dict = self.mlm.parse_recipient_address(address, skip_prefix, 
109                                                 forced_domain)
110         self.failUnlessEqual(dict, wanted_dict)
111
112     def testSimpleAddresses(self):
113         self.verify("foo@example.com", 
114                     None, 
115                     None, 
116                     { "name": "foo@example.com", "command": "post" })
117         self.verify("FOO@EXAMPLE.COM", 
118                     None, 
119                     None, 
120                     { "name": "foo@example.com", "command": "post" })
121         self.verify("prefix-foo@example.com", 
122                     "prefix-", 
123                     None, 
124                     { "name": "foo@example.com", "command": "post" })
125         self.verify("bar@example.com", 
126                     None, 
127                     "lists.example.com", 
128                     { "name": "bar@lists.example.com", "command": "post" })
129         self.verify("prefix-bar@example.com", 
130                     "prefix-",
131                     "lists.example.com", 
132                     { "name": "bar@lists.example.com", "command": "post" })
133
134     def testSubscription(self):
135         self.verify("foo-subscribe@example.com", 
136                     None,
137                     None,
138                     { "name": "foo@example.com", 
139                       "command": "subscribe",
140                       "sender": "",
141                     })
142         self.verify("foo-subscribe-joe-user=example.com@example.com", 
143                     None,
144                     None,
145                     { "name": "foo@example.com", 
146                       "command": "subscribe",
147                       "sender": "joe-user@example.com",
148                     })
149         self.verify("foo-unsubscribe@example.com", 
150                     None,
151                     None,
152                     { "name": "foo@example.com", 
153                       "command": "unsubscribe",
154                       "sender": "",
155                     })
156         self.verify("foo-unsubscribe-joe-user=example.com@example.com", 
157                     None,
158                     None,
159                     { "name": "foo@example.com", 
160                       "command": "unsubscribe",
161                       "sender": "joe-user@example.com",
162                     })
163
164     def testPost(self):
165         for name in self.lists:
166             self.verify(name, None, None, { "name": name, "command": "post" })
167
168     def testSimpleCommands(self):
169         for name in self.lists:
170             for command in ["help", "list", "owner"]:
171                 localpart, domain = name.split("@")
172                 address = "%s-%s@%s" % (localpart, command, domain)
173                 self.verify(address, None, None,
174                             { "name": name,
175                               "command": command
176                             })
177
178 class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):
179
180     def try_good_signature(self, command):
181         s = "foo-announce-%s-1" % command
182         hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
183         local_part = "%s-%s" % (s, hash)
184         dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
185                                                 None, None)
186         self.failUnlessEqual(dict,
187                              {
188                                 "name": "foo-announce@example.com",
189                                 "command": command,
190                                 "id": "1",
191                              })
192
193     def testProperlySignedCommands(self):
194         self.try_good_signature("subyes")
195         self.try_good_signature("subapprove")
196         self.try_good_signature("subreject")
197         self.try_good_signature("unsubyes")
198         self.try_good_signature("bounce")
199         self.try_good_signature("approve")
200         self.try_good_signature("reject")
201         self.try_good_signature("probe")
202
203 class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):
204
205     def try_bad_signature(self, command_part):
206         self.failUnlessRaises(eoc.BadSignature,
207                               self.mlm.parse_recipient_address, 
208                               "foo-announce-" + command_part + 
209                                     "-123-badhash@example.com",
210                               None, None)
211
212     def testBadlySignedCommands(self):
213         self.try_bad_signature("subyes")
214         self.try_bad_signature("subapprove")
215         self.try_bad_signature("subreject")
216         self.try_bad_signature("unsubyes")
217         self.try_bad_signature("bounce")
218         self.try_bad_signature("approve")
219         self.try_bad_signature("reject")
220         self.try_bad_signature("probe")
221
222 class DotDirTestCases(unittest.TestCase):
223
224     def setUp(self):
225         self.secret_name = os.path.join(DOTDIR, "secret")
226
227     def tearDown(self):
228         shutil.rmtree(DOTDIR)
229
230     def dotdir_is_ok(self):
231         self.failUnless(os.path.isdir(DOTDIR))
232         self.failUnless(os.path.isfile(self.secret_name))
233
234     def testNoDotDirExists(self):
235         self.failIf(os.path.exists(DOTDIR))
236         mlm = eoc.MailingListManager(DOTDIR)
237         self.dotdir_is_ok()
238
239     def testDotDirDoesExistButSecretDoesNot(self):
240         self.failIf(os.path.exists(DOTDIR))
241         os.makedirs(DOTDIR)
242         self.failUnless(os.path.isdir(DOTDIR))
243         self.failIf(os.path.exists(self.secret_name))
244         mlm = eoc.MailingListManager(DOTDIR)
245         self.dotdir_is_ok()
246
247
248 class RemoveSomeHeadersTest(unittest.TestCase):
249
250     def testRemoveSomeHeaders(self):
251         mlm = eoc.MailingListManager(DOTDIR)
252         ml = eoc.MailingList(mlm, "list@example.com")
253         mail = """\
254 Header-1: this is a simple header
255 Header-2: this
256     is
257     a
258     complex header with a colon: yes it is
259 Header-3: odd numbered headers are simple
260
261 Body.
262 """
263         mail2 = ml.remove_some_headers(mail, ["Header-2"])
264         self.failUnlessEqual(mail2, """\
265 Header-1: this is a simple header
266 Header-3: odd numbered headers are simple
267
268 Body.
269 """)
270
271 class ListBase(unittest.TestCase):
272
273     def setUp(self):
274         if os.path.exists(DOTDIR):
275             shutil.rmtree(DOTDIR)
276         self.mlm = eoc.MailingListManager(DOTDIR)
277
278     def tearDown(self):
279         self.mlm = None
280         shutil.rmtree(DOTDIR)
281
282 class ListCreationTestCases(ListBase):
283
284     def setUp(self):
285         ListBase.setUp(self)
286         self.names = None
287
288     def listdir(self, listname):
289         return os.path.join(DOTDIR, listname)
290
291     def listdir_has_file(self, listdir, filename):
292         self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
293         self.names.remove(filename)
294
295     def listdir_has_dir(self, listdir, dirname):
296         self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
297         self.names.remove(dirname)
298
299     def listdir_may_have_dir(self, listdir, dirname):
300         if dirname in self.names:
301             self.listdir_has_dir(listdir, dirname)
302
303     def listdir_is_ok(self, listname):
304         listdir = self.listdir(listname)
305         self.failUnless(os.path.isdir(listdir))
306         self.names = os.listdir(listdir)
307         
308         self.listdir_has_file(listdir, "config")
309         self.listdir_has_file(listdir, "subscribers")
310             
311         self.listdir_has_dir(listdir, "bounce-box")
312         self.listdir_has_dir(listdir, "subscription-box")
313             
314         self.listdir_may_have_dir(listdir, "moderation-box")
315         self.listdir_may_have_dir(listdir, "templates")
316             
317         # Make sure there are no extras.
318         self.failUnlessEqual(self.names, [])
319
320     def testCreateNew(self):
321         self.failIf(os.path.exists(self.listdir("foo@example.com")))
322         ml = self.mlm.create_list("foo@example.com")
323
324         self.failUnlessEqual(ml.__class__, eoc.MailingList)
325         self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))
326
327         self.listdir_is_ok("foo@example.com")
328
329     def testCreateExisting(self):
330         list = self.mlm.create_list("foo@example.com")
331         self.failUnlessRaises(eoc.ListExists,
332                               self.mlm.create_list, "foo@example.com")
333         self.listdir_is_ok("foo@example.com")
334
335 class ListOptionTestCases(ListBase):
336
337     def check(self, ml, wanted):
338         self.failUnlessEqual(ml.cp.sections(), ["list"])
339         cpdict = {}
340         for key, value in ml.cp.items("list"):
341             cpdict[key] = value
342         self.failUnlessEqual(cpdict, wanted)
343
344     def testDefaultOptionsOnCreateAndOpenExisting(self):
345         self.mlm.create_list("foo@example.com")
346         ml = self.mlm.open_list("foo@example.com")
347         self.check(ml,
348                    {
349                       "owners": "",
350                       "moderators": "",
351                       "subscription": "free",
352                       "posting": "free",
353                       "archived": "no",
354                       "mail-on-subscription-changes": "no",
355                       "mail-on-forced-unsubscribe": "no",
356                       "ignore-bounce": "no",
357                       "language": "",
358                       "pristine-headers": "",
359                    })
360
361     def testChangeOptions(self):
362         # Create a list, change some options, and save the result.
363         ml = self.mlm.create_list("foo@example.com")
364         self.failUnlessEqual(ml.cp.get("list", "owners"), "")
365         self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
366         ml.cp.set("list", "owners", "owner@example.com")
367         ml.cp.set("list", "posting", "moderated")
368         ml.save_config()
369         
370         # Re-open the list and check that the new instance has read the
371         # values from the disk correctly.
372         ml2 = self.mlm.open_list("foo@example.com")
373         self.check(ml2,
374                    {
375                       "owners": "owner@example.com",
376                       "moderators": "",
377                       "subscription": "free",
378                       "posting": "moderated",
379                       "archived": "no",
380                       "mail-on-subscription-changes": "no",
381                       "mail-on-forced-unsubscribe": "no",
382                       "ignore-bounce": "no",
383                       "language": "",
384                       "pristine-headers": "",
385                    })
386
387 class SubscriberDatabaseTestCases(ListBase):
388
389     def has_subscribers(self, ml, addrs):
390         subs = ml.subscribers.get_all()
391         subs.sort()
392         self.failUnlessEqual(subs, addrs)
393
394     def testAddAndRemoveSubscribers(self):
395         addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
396         addrs.sort()
397     
398         ml = self.mlm.create_list("foo@example.com")
399         self.failUnlessEqual(ml.subscribers.get_all(), [])
400
401         self.failUnless(ml.subscribers.lock())
402         ml.subscribers.add_many(addrs)
403         self.has_subscribers(ml, addrs)
404         ml.subscribers.save()
405         self.failIf(ml.subscribers.locked)
406         ml = None
407
408         ml2 = self.mlm.open_list("foo@example.com")
409         self.has_subscribers(ml2, addrs)
410         ml2.subscribers.lock()
411         ml2.subscribers.remove(addrs[0])
412         self.has_subscribers(ml2, addrs[1:])
413         
414         ml2.subscribers.save()
415         
416         ml3 = self.mlm.open_list("foo@example.com")
417         self.has_subscribers(ml3, addrs[1:])
418
419     def testSubscribeTwice(self):
420         ml = self.mlm.create_list("foo@example.com")
421         self.failUnlessEqual(ml.subscribers.get_all(), [])
422         ml.subscribers.lock()
423         ml.subscribers.add("user@example.com")
424         ml.subscribers.add("USER@example.com")
425         self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
426                              map(string.lower, ["USER@example.com"]))
427
428     def testSubscriberAttributesAndGroups(self):
429         addrs = ["joe@example.com", "mary@example.com"]
430         addrs.sort()
431         ml = self.mlm.create_list("foo@example.com")
432         self.failUnlessEqual(ml.subscribers.groups(), [])
433         ml.subscribers.lock()
434         id = ml.subscribers.add_many(addrs)
435         self.failUnlessEqual(ml.subscribers.groups(), ["0"])
436         self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
437         ml.subscribers.set(id, "status", "bounced")
438         self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
439         subs = ml.subscribers.in_group(id)
440         subs.sort()
441         self.failUnlessEqual(subs, addrs)
442
443 class ModerationBoxTestCases(ListBase):
444
445     def testModerationBox(self):
446         ml = self.mlm.create_list("foo@example.com")
447         listdir = os.path.join(DOTDIR, "foo@example.com")
448         boxdir = os.path.join(listdir, "moderation-box")
449
450         self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
451         self.failUnless(os.path.isdir(boxdir))
452
453         mailtext = "From: foo\nTo: bar\n\nhello\n"
454         id = ml.moderation_box.add("foo", mailtext)
455         self.failUnless(ml.moderation_box.has(id))
456         self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
457         self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
458         
459         filename = os.path.join(boxdir, id)
460         self.failUnless(os.path.isfile(filename))
461         self.failUnless(os.path.isfile(filename + ".address"))
462         
463         ml.moderation_box.remove(id)
464         self.failIf(ml.moderation_box.has(id))
465         self.failUnless(not os.path.exists(filename))
466
467 class IncomingBase(unittest.TestCase):
468
469     def setUp(self):
470         if os.path.isdir(DOTDIR):
471             shutil.rmtree(DOTDIR)
472         self.mlm = eoc.MailingListManager(DOTDIR)
473         self.ml = None
474         ml = self.mlm.create_list("foo@EXAMPLE.com")
475         ml.cp.set("list", "owners", "listmaster@example.com")
476         ml.save_config()
477         ml.subscribers.lock()
478         ml.subscribers.add("USER1@example.com")
479         ml.subscribers.add("user2@EXAMPLE.com")
480         ml.subscribers.save()
481         self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
482         self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
483         self.sent_mail = []
484
485     def tearDown(self):
486         shutil.rmtree(DOTDIR)
487
488     def write_file_in_listdir(self, ml, basename, contents):
489         f = open(os.path.join(ml.dirname, basename), "w")
490         f.write(contents)
491         f.close()
492
493     def configure_list(self, subscription, posting):
494         list = self.mlm.open_list("foo@example.com")
495         list.cp.set("list", "subscription", subscription)
496         list.cp.set("list", "posting", posting)
497         list.save_config()
498
499     def environ(self, sender, recipient):
500         eoc.set_environ({
501             "SENDER": sender,
502             "RECIPIENT": recipient,
503         })
504
505     def catch_sendmail(self, sender, recipients, text):
506         self.sent_mail.append({
507             "sender": sender,
508             "recipients": recipients,
509             "text": text,
510         })
511
512     def send(self, sender, recipient, text="", force_moderation=0, 
513              force_posting=0):
514         self.environ(sender, recipient)
515         dict = self.mlm.parse_recipient_address(recipient, None, None)
516         dict["force-moderation"] = force_moderation
517         dict["force-posting"] = force_posting
518         self.ml = self.mlm.open_list(dict["name"])
519         if "\n\n" not in text:
520             text = "\n\n" + text
521         text = "Received: foobar\n" + text
522         self.ml.read_stdin = lambda t=text: t
523         self.mlm.send_mail = self.catch_sendmail
524         self.sent_mail = []
525         self.ml.obey(dict)
526
527     def sender_matches(self, mail, sender):
528         pat = "(?P<address>" + sender + ")"
529         m = re.match(pat, mail["sender"], re.I)
530         if m:
531             return m.group("address")
532         else:
533             return None
534         
535     def replyto_matches(self, mail, replyto):
536         pat = "(.|\n)*(?P<address>" + replyto + ")"
537         m = re.match(pat, mail["text"], re.I)
538         if m:
539             return m.group("address")
540         else:
541             return None
542
543     def receiver_matches(self, mail, recipient):
544         return map(string.lower, mail["recipients"]) == [recipient.lower()]
545
546     def body_matches(self, mail, body):
547         if body:
548             pat = re.compile("(.|\n)*" + body + "(.|\n)*")
549             m = re.match(pat, mail["text"])
550             return m
551         else:
552             return 1
553
554     def headers_match(self, mail, header):
555         if header:
556             pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
557             m = re.match(pat, mail["text"])
558             return m
559         else:
560             return 1
561
562     def match(self, sender, replyto, receiver, body=None, header=None,
563               anti_header=None):
564         ret = None
565         for mail in self.sent_mail:
566             if replyto is None:
567                 m1 = self.sender_matches(mail, sender)
568                 m3 = self.receiver_matches(mail, receiver)
569                 m4 = self.body_matches(mail, body)
570                 m5 = self.headers_match(mail, header)
571                 m6 = self.headers_match(mail, anti_header)
572                 no_anti_header = anti_header == None or m6 == None
573                 if m1 != None and m3 and m4 and m5 and no_anti_header:
574                     ret = m1
575                     self.sent_mail.remove(mail)
576                     break
577             else:
578                 m1 = self.sender_matches(mail, sender)
579                 m2 = self.replyto_matches(mail, replyto)
580                 m3 = self.receiver_matches(mail, receiver)
581                 m4 = self.body_matches(mail, body)
582                 m5 = self.headers_match(mail, header)
583                 m6 = self.headers_match(mail, anti_header)
584                 no_anti_header = anti_header == None or m6 == None
585                 if m1 != None and m2 != None and m3 and m4 and m5 and \
586                    no_anti_header:
587                     ret = m2
588                     self.sent_mail.remove(mail)
589                     break
590         self.failUnless(ret != None)
591         return ret
592
593     def no_more_mail(self):
594         self.failUnlessEqual(self.sent_mail, [])
595
596
597 class SimpleCommandAddressTestCases(IncomingBase):
598
599     def testHelp(self):
600         self.send("outsider@example.com", "foo-help@example.com")
601         self.match("foo-ignore@example.com", None, "outsider@example.com", 
602                    "Subject: Help for")
603         self.no_more_mail()
604
605     def testOwner(self):
606         self.send("outsider@example.com", "foo-owner@example.com", "abcde")
607         self.match("outsider@example.com", None, "listmaster@example.com",
608                    "abcde")
609         self.no_more_mail()
610
611     def testIgnore(self):
612         self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
613         self.no_more_mail()
614
615 class OwnerCommandTestCases(IncomingBase):
616
617     def testList(self):
618         self.send("listmaster@example.com", "foo-list@example.com")
619         self.match("foo-ignore@example.com", None, "listmaster@example.com",
620                    "[uU][sS][eE][rR][12]@" +
621                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
622                    "[uU][sS][eE][rR][12]@" +
623                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
624         self.no_more_mail()
625
626     def testListDenied(self):
627         self.send("outsider@example.com", "foo-list@example.com")
628         self.match("foo-ignore@example.com", None, "outsider@example.com", 
629                    "Subject: Subscriber list denied")
630         self.no_more_mail()
631
632     def testSetlist(self):
633         self.send("listmaster@example.com", "foo-setlist@example.com",
634                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
635         a = self.match("foo-ignore@example.com", 
636                        "foo-setlistyes-[^@]*@example.com", 
637                        "listmaster@example.com", 
638                        "Subject: Please moderate subscriber list")
639         self.no_more_mail()
640         
641         self.send("listmaster@example.com", a)
642         self.match("foo-ignore@example.com", None, "listmaster@example.com",
643                    "Subject: Subscriber list has been changed")
644         self.match("foo-ignore@example.com", None, "new1@example.com",
645                    "Subject: Welcome to")
646         self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
647                    "Subject: Goodbye from")
648         self.no_more_mail()
649
650     def testSetlistSilently(self):
651         self.send("listmaster@example.com", "foo-setlistsilently@example.com",
652                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
653         a = self.match("foo-ignore@example.com", 
654                        "foo-setlistsilentyes-[^@]*@example.com", 
655                        "listmaster@example.com", 
656                        "Subject: Please moderate subscriber list")
657         self.no_more_mail()
658         
659         self.send("listmaster@example.com", a)
660         self.match("foo-ignore@example.com", None, "listmaster@example.com",
661                    "Subject: Subscriber list has been changed")
662         self.no_more_mail()
663
664     def testSetlistDenied(self):
665         self.send("outsider@example.com", "foo-setlist@example.com",
666                   "From: foo\n\nnew1@example.com\nnew2@example.com\n")
667         self.match("foo-ignore@example.com", 
668                    None,
669                    "outsider@example.com", 
670                    "Subject: You can't set the subscriber list")
671         self.no_more_mail()
672
673     def testSetlistBadlist(self):
674         self.send("listmaster@example.com", "foo-setlist@example.com",
675                   "From: foo\n\nBlah blah blah.\n")
676         self.match("foo-ignore@example.com", 
677                    None,
678                    "listmaster@example.com", 
679                    "Subject: Bad address list")
680         self.no_more_mail()
681
682     def testOwnerSubscribesSomeoneElse(self):
683         # Send subscription request. List sends confirmation request.
684         self.send("listmaster@example.com",
685                   "foo-subscribe-outsider=example.com@example.com")
686         a = self.match("foo-ignore@example.com", 
687                        "foo-subyes-[^@]*@example.com", 
688                        "listmaster@example.com",
689                        "Please confirm subscription")
690         self.no_more_mail()
691         
692         # Confirm sub. req. List sends welcome.
693         self.send("listmaster@example.com", a)
694         self.match("foo-ignore@example.com", 
695                    None, 
696                    "outsider@example.com", 
697                    "Welcome to the")
698         self.no_more_mail()
699
700     def testOwnerUnubscribesSomeoneElse(self):
701         # Send unsubscription request. List sends confirmation request.
702         self.send("listmaster@example.com",
703                   "foo-unsubscribe-outsider=example.com@example.com")
704         a = self.match("foo-ignore@example.com", 
705                        "foo-unsubyes-[^@]*@example.com", 
706                        "listmaster@example.com",
707                        "Subject: Please confirm UNsubscription")
708         self.no_more_mail()
709         
710         # Confirm sub. req. List sends welcome.
711         self.send("listmaster@example.com", a)
712         self.match("foo-ignore@example.com", None, "outsider@example.com", 
713                    "Goodbye")
714         self.no_more_mail()
715
716 class SubscriptionTestCases(IncomingBase):
717
718     def confirm(self, recipient):
719         # List has sent confirmation request. Respond to it.
720         a = self.match("foo-ignore@example.com", 
721                        "foo-subyes-[^@]*@example.com", 
722                        recipient,
723                        "Please confirm subscription")
724         self.no_more_mail()
725         
726         # Confirm sub. req. List response will be analyzed later.
727         self.send("something.random@example.com", a)
728
729     def got_welcome(self, recipient):
730         self.match("foo-ignore@example.com", 
731                    None, 
732                    recipient, 
733                    "Welcome to the")
734         self.no_more_mail()
735
736     def approve(self, user_recipient):
737         self.match("foo-ignore@example.com", None, user_recipient)
738         a = self.match("foo-ignore@example.com", 
739                        "foo-subapprove-[^@]*@example.com",
740                        "listmaster@example.com")
741         self.send("listmaster@example.com", a)
742
743     def reject(self, user_recipient):
744         self.match("foo-ignore@example.com", None, user_recipient)
745         a = self.match("foo-ignore@example.com", 
746                        "foo-subreject-[^@]*@example.com",
747                        "listmaster@example.com")
748         self.send("listmaster@example.com", a)
749
750     def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
751         self.configure_list("free", "free")
752         self.send("outsider@example.com", "foo-subscribe@example.com")
753         self.confirm("outsider@example.com")
754         self.got_welcome("outsider@example.com")
755
756     def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
757         self.configure_list("free", "free")
758         self.send("user1@example.com", "foo-subscribe@example.com")
759         self.confirm("user1@example.com")
760         self.got_welcome("user1@example.com")
761
762     def testSubscribeToUnmoderatedWithAddressNotOnList(self):
763         self.configure_list("free", "free")
764         self.send("somebody.else@example.com", 
765                   "foo-subscribe-outsider=example.com@example.com")
766         self.confirm("outsider@example.com")
767         self.got_welcome("outsider@example.com")
768
769     def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
770         self.configure_list("free", "free")
771         self.send("somebody.else@example.com", 
772                   "foo-subscribe-user1=example.com@example.com")
773         self.confirm("user1@example.com")
774         self.got_welcome("user1@example.com")
775
776     def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
777         self.configure_list("moderated", "moderated")
778         self.send("outsider@example.com", "foo-subscribe@example.com")
779         self.confirm("outsider@example.com")
780         self.approve("outsider@example.com")
781         self.got_welcome("outsider@example.com")
782
783     def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
784         self.configure_list("moderated", "moderated")
785         self.send("outsider@example.com", "foo-subscribe@example.com")
786         self.confirm("outsider@example.com")
787         self.reject("outsider@example.com")
788
789     def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
790         self.configure_list("moderated", "moderated")
791         self.send("user1@example.com", "foo-subscribe@example.com")
792         self.confirm("user1@example.com")
793         self.approve("user1@example.com")
794         self.got_welcome("user1@example.com")
795
796     def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
797         self.configure_list("moderated", "moderated")
798         self.send("user1@example.com", "foo-subscribe@example.com")
799         self.confirm("user1@example.com")
800         self.reject("user1@example.com")
801
802     def testSubscribeToModeratedWithAddressNotOnListApproved(self):
803         self.configure_list("moderated", "moderated")
804         self.send("somebody.else@example.com", 
805                   "foo-subscribe-outsider=example.com@example.com")
806         self.confirm("outsider@example.com")
807         self.approve("outsider@example.com")
808         self.got_welcome("outsider@example.com")
809
810     def testSubscribeToModeratedWithAddressNotOnListRejected(self):
811         self.configure_list("moderated", "moderated")
812         self.send("somebody.else@example.com", 
813                   "foo-subscribe-outsider=example.com@example.com")
814         self.confirm("outsider@example.com")
815         self.reject("outsider@example.com")
816
817     def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
818         self.configure_list("moderated", "moderated")
819         self.send("somebody.else@example.com", 
820                   "foo-subscribe-user1=example.com@example.com")
821         self.confirm("user1@example.com")
822         self.approve("user1@example.com")
823         self.got_welcome("user1@example.com")
824
825     def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
826         self.configure_list("moderated", "moderated")
827         self.send("somebody.else@example.com", 
828                   "foo-subscribe-user1=example.com@example.com")
829         self.confirm("user1@example.com")
830         self.reject("user1@example.com")
831
832 class UnsubscriptionTestCases(IncomingBase):
833
834     def confirm(self, recipient):
835         # List has sent confirmation request. Respond to it.
836         a = self.match("foo-ignore@example.com", 
837                        "foo-unsubyes-[^@]*@example.com", 
838                        recipient,
839                        "Please confirm UNsubscription")
840         self.no_more_mail()
841         
842         # Confirm sub. req. List response will be analyzed later.
843         self.send("something.random@example.com", a)
844
845     def got_goodbye(self, recipient):
846         self.match("foo-ignore@example.com", 
847                    None, 
848                    recipient, 
849                    "Goodbye from")
850         self.no_more_mail()
851
852     def testUnsubscribeWithoutAddressNotOnList(self):
853         self.send("outsider@example.com", "foo-unsubscribe@example.com")
854         self.confirm("outsider@example.com")
855         self.got_goodbye("outsider@example.com")
856
857     def testUnsubscribeWithoutAddressOnList(self):
858         self.send("user1@example.com", "foo-unsubscribe@example.com")
859         self.confirm("user1@example.com")
860         self.got_goodbye("user1@example.com")
861
862     def testUnsubscribeWithAddressNotOnList(self):
863         self.send("somebody.else@example.com", 
864                   "foo-unsubscribe-outsider=example.com@example.com")
865         self.confirm("outsider@example.com")
866         self.got_goodbye("outsider@example.com")
867
868     def testUnsubscribeWithAddressOnList(self):
869         self.send("somebody.else@example.com", 
870                   "foo-unsubscribe-user1=example.com@example.com")
871         self.confirm("user1@example.com")
872         self.got_goodbye("user1@example.com")
873
874 class PostTestCases(IncomingBase):
875
876     msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")
877
878     def approve(self, user_recipient):
879         self.match("foo-ignore@example.com", None, user_recipient)
880         a = self.match("foo-ignore@example.com", 
881                        "foo-approve-[^@]*@example.com",
882                        "listmaster@example.com")
883         self.send("listmaster@example.com", a)
884
885     def reject(self, user_recipient):
886         self.match("foo-ignore@example.com", None, user_recipient)
887         a = self.match("foo-ignore@example.com", 
888                        "foo-reject-[^@]*@example.com",
889                        "listmaster@example.com")
890         self.send("listmaster@example.com", a)
891
892     def check_headers_are_encoded(self):
893         ok_chars = "\t\r\n"
894         for code in range(32, 127):
895             ok_chars = ok_chars + chr(code)
896         for mail in self.sent_mail:
897             text = mail["text"]
898             self.failUnless("\n\n" in text)
899             headers = text.split("\n\n")[0]
900             for c in headers:
901                 if c not in ok_chars: print headers
902                 self.failUnless(c in ok_chars)
903
904     def check_mail_to_list(self):
905         self.check_headers_are_encoded()
906         self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
907                    body="hello, world",
908                    header="X-Foo: FOO",
909                    anti_header="Received:")
910         self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
911                    body="hello, world",
912                    header="x-foo: foo",
913                    anti_header="Received:")
914         self.no_more_mail()
915
916     def check_that_moderation_box_is_empty(self):
917         ml = self.mlm.open_list("foo@example.com")
918         self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])
919
920     def testSubscriberPostsToUnmoderated(self):
921         self.configure_list("free", "free")
922         self.send("user1@example.com", "foo@example.com", 
923                   self.msg)
924         self.check_mail_to_list()
925
926     def testOutsiderPostsToUnmoderated(self):
927         self.configure_list("free", "free")
928         self.send("outsider@example.com", "foo@example.com", self.msg)
929         self.check_mail_to_list()
930
931     def testSubscriberPostToAutomoderated(self):
932         self.configure_list("free", "auto")
933         self.check_that_moderation_box_is_empty()
934         self.send("user1@example.com", "foo@example.com", self.msg)
935         self.check_mail_to_list()
936         self.check_that_moderation_box_is_empty()
937
938     def testOutsiderPostsToAutomoderatedRejected(self):
939         self.configure_list("free", "auto")
940         self.check_that_moderation_box_is_empty()
941         self.send("outsider@example.com", "foo@example.com", self.msg)
942         self.reject("outsider@example.com")
943         self.check_that_moderation_box_is_empty()
944
945     def testOutsiderPostsToAutomoderatedApproved(self):
946         self.configure_list("free", "auto")
947         self.check_that_moderation_box_is_empty()
948         self.send("outsider@example.com", "foo@example.com", self.msg)
949         self.approve("outsider@example.com")
950         self.check_mail_to_list()
951         self.check_that_moderation_box_is_empty()
952
953     def testSubscriberPostsToModeratedRejected(self):
954         self.configure_list("free", "moderated")
955         self.check_that_moderation_box_is_empty()
956         self.send("user1@example.com", "foo@example.com", self.msg)
957         self.reject("user1@example.com")
958         self.check_that_moderation_box_is_empty()
959
960     def testOutsiderPostsToMderatedApproved(self):
961         self.configure_list("free", "moderated")
962         self.check_that_moderation_box_is_empty()
963         self.send("outsider@example.com", "foo@example.com", self.msg)
964         self.approve("outsider@example.com")
965         self.check_mail_to_list()
966         self.check_that_moderation_box_is_empty()
967
968     def testSubscriberPostsWithRequestToBeModerated(self):
969         self.configure_list("free", "free")
970
971         self.check_that_moderation_box_is_empty()
972         self.send("user1@example.com", "foo@example.com", self.msg,
973                   force_moderation=1)
974         self.match("foo-ignore@example.com", 
975                    None, 
976                    "user1@example.com", 
977                    "Subject: Please wait")
978         a = self.match("foo-ignore@example.com", 
979                        "foo-approve-[^@]*@example.com", 
980                        "listmaster@example.com")
981         self.no_more_mail()
982
983         self.send("listmaster@example.com", a)
984         self.check_mail_to_list()
985         self.check_that_moderation_box_is_empty()
986
987     def testSubscriberPostsWithModerationOverride(self):
988         self.configure_list("moderated", "moderated")
989         self.send("user1@example.com", "foo@example.com", self.msg,
990                   force_posting=1)
991         self.check_mail_to_list()
992         self.check_that_moderation_box_is_empty()
993
994 class BounceTestCases(IncomingBase):
995
996     def check_subscriber_status(self, must_be):
997         ml = self.mlm.open_list("foo@example.com")
998         for id in ml.subscribers.groups():
999             self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)
1000
1001     def bounce_sent_mail(self):
1002         for m in self.sent_mail[:]:
1003             self.send("something@example.com", m["sender"], "eek")
1004             self.failUnlessEqual(len(self.sent_mail), 0)
1005
1006     def send_mail_to_list_then_bounce_everything(self):
1007         self.send("user@example.com", "foo@example.com", "hello")
1008         for m in self.sent_mail[:]:
1009             self.send("foo@example.com", m["sender"], "eek")
1010             self.failUnlessEqual(len(self.sent_mail), 0)
1011
1012     def testBounceOnceThenRecover(self):
1013         self.check_subscriber_status("ok")
1014         self.send_mail_to_list_then_bounce_everything()
1015
1016         self.check_subscriber_status("bounced")
1017         
1018         ml = self.mlm.open_list("foo@example.com")
1019         for id in ml.subscribers.groups():
1020             bounce_id = ml.subscribers.get(id, "bounce-id")
1021             self.failUnless(bounce_id)
1022             self.failUnless(ml.bounce_box.has(bounce_id))
1023
1024         bounce_ids = []
1025         now = time.time()
1026         ml = self.mlm.open_list("foo@example.com")
1027         ml.subscribers.lock()
1028         for id in ml.subscribers.groups():
1029             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
1030             self.failUnless(abs(timestamp - now) < 10.0)
1031             ml.subscribers.set(id, "timestamp-bounced", "69.0")
1032             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
1033         ml.subscribers.save()
1034
1035         self.mlm.cleaning_woman(no_op)
1036         self.check_subscriber_status("probed")
1037
1038         for bounce_id in bounce_ids:
1039             self.failUnless(ml.bounce_box.has(bounce_id))
1040
1041         self.mlm.cleaning_woman(no_op)
1042         ml = self.mlm.open_list("foo@example.com")
1043         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1044         self.check_subscriber_status("ok")
1045         for bounce_id in bounce_ids:
1046             self.failUnless(not ml.bounce_box.has(bounce_id))
1047
1048     def testBounceProbeAlso(self):
1049         self.check_subscriber_status("ok")
1050         self.send_mail_to_list_then_bounce_everything()
1051         self.check_subscriber_status("bounced")
1052         
1053         ml = self.mlm.open_list("foo@example.com")
1054         for id in ml.subscribers.groups():
1055             bounce_id = ml.subscribers.get(id, "bounce-id")
1056             self.failUnless(bounce_id)
1057             self.failUnless(ml.bounce_box.has(bounce_id))
1058
1059         bounce_ids = []
1060         now = time.time()
1061         ml = self.mlm.open_list("foo@example.com")
1062         ml.subscribers.lock()
1063         for id in ml.subscribers.groups():
1064             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
1065             self.failUnless(abs(timestamp - now) < 10.0)
1066             ml.subscribers.set(id, "timestamp-bounced", "69.0")
1067             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
1068         ml.subscribers.save()
1069
1070         self.sent_mail = []
1071         self.mlm.cleaning_woman(self.catch_sendmail)
1072         self.check_subscriber_status("probed")
1073         for bounce_id in bounce_ids:
1074             self.failUnless(ml.bounce_box.has(bounce_id))
1075         self.bounce_sent_mail()
1076         self.check_subscriber_status("probebounced")
1077
1078         self.mlm.cleaning_woman(no_op)
1079         ml = self.mlm.open_list("foo@example.com")
1080         self.failUnlessEqual(len(ml.subscribers.groups()), 0)
1081         for bounce_id in bounce_ids:
1082             self.failUnless(not ml.bounce_box.has(bounce_id))
1083
1084     def testCleaningWomanJoinsAndBounceSplitsGroups(self):
1085         # Check that each group contains one address and set the creation
1086         # timestamp to an ancient time.
1087         ml = self.mlm.open_list("foo@example.com")
1088         bouncedir = os.path.join(ml.dirname, "bounce-box")
1089         ml.subscribers.lock()
1090         for id in ml.subscribers.groups():
1091             addrs = ml.subscribers.in_group(id)
1092             self.failUnlessEqual(len(addrs), 1)
1093             bounce_id = ml.subscribers.get(id, "bounce-id")
1094             self.failUnlessEqual(bounce_id, "..notexist..")
1095             bounce_id = "bounce-" + id
1096             ml.subscribers.set(id, "bounce-id", bounce_id)
1097             bounce_path = os.path.join(bouncedir, bounce_id)
1098             self.failUnless(not os.path.isfile(bounce_path))
1099             f = open(bounce_path, "w")
1100             f.close()
1101             f = open(bounce_path + ".address", "w")
1102             f.close()
1103             self.failUnless(os.path.isfile(bounce_path))
1104             ml.subscribers.set(id, "timestamp-created", "1")
1105         ml.subscribers.save()
1106
1107         # Check that --cleaning-woman joins the two groups into one.
1108         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1109         self.mlm.cleaning_woman(no_op)
1110         ml = self.mlm.open_list("foo@example.com")
1111         self.failUnlessEqual(len(ml.subscribers.groups()), 1)
1112         self.failUnlessEqual(os.listdir(bouncedir), [])
1113         
1114         # Check that a bounce splits the single group.
1115         self.send_mail_to_list_then_bounce_everything()
1116         ml = self.mlm.open_list("foo@example.com")
1117         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1118         
1119         # Check that a --cleaning-woman immediately after doesn't join.
1120         # (The groups are new, thus shouldn't be joined for a week.)
1121         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
1122         self.mlm.cleaning_woman(no_op)
1123         ml = self.mlm.open_list("foo@example.com")
1124         self.failUnlessEqual(len(ml.subscribers.groups()), 2)