10 DOTDIR = "dot-dir-for-testing"
 
  17 class AddressCleaningTestCases(unittest.TestCase):
 
  20         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
 
  21                                      "bar@lists.example.com"])
 
  23     def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
 
  24         self.ap.set_skip_prefix(skip_prefix)
 
  25         self.ap.set_forced_domain(forced_domain)
 
  26         address = self.ap.clean(address)
 
  27         self.failUnlessEqual(address, wanted)
 
  29     def testSimpleAddress(self):
 
  30         self.verify("foo@example.com", "foo@example.com")
 
  32     def testUpperCaseAddress(self):
 
  33         self.verify("FOO@EXAMPLE.COM", "foo@example.com")
 
  35     def testPrefixRemoval(self):
 
  36         self.verify("foo@example.com", "foo@example.com", 
 
  38         self.verify("user-foo@example.com", "foo@example.com", 
 
  41     def testForcedDomain(self):
 
  42         self.verify("foo@example.com", "foo@example.com",
 
  43                     forced_domain="example.com")
 
  44         self.verify("foo@whatever.example.com", "foo@example.com", 
 
  45                     forced_domain="example.com")
 
  47     def testPrefixRemovalWithForcedDomain(self):
 
  48         self.verify("foo@example.com", "foo@example.com", 
 
  50                     forced_domain="example.com")
 
  51         self.verify("foo@whatever.example.com", "foo@example.com", 
 
  53                     forced_domain="example.com")
 
  54         self.verify("user-foo@example.com", "foo@example.com", 
 
  56                     forced_domain="example.com")
 
  57         self.verify("user-foo@whatever.example.com", "foo@example.com", 
 
  59                     forced_domain="example.com")
 
  61 class AddressParserTestCases(unittest.TestCase):
 
  64         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
 
  65                                      "bar@lists.example.com"])
 
  67     def verify_parser(self, address, wanted_listname, wanted_parts):
 
  68         listname, parts = self.ap.parse(address)
 
  69         self.failUnlessEqual(listname, wanted_listname)
 
  70         self.failUnlessEqual(parts, wanted_parts)
 
  73         self.verify_parser("foo@example.com", 
 
  76         self.verify_parser("foo-subscribe@example.com", 
 
  79         self.verify_parser("foo-subscribe-joe=example.com@example.com", 
 
  81                            ["subscribe", "joe=example.com"])
 
  82         self.verify_parser("foo-bounce-123-ABCDEF@example.com", 
 
  84                            ["bounce", "123", "abcdef"])
 
  86 class ParseRecipientAddressBase(unittest.TestCase):
 
  89         self.lists = ["foo@example.com", 
 
  90                       "bar@lists.example.com",
 
  91                       "foo-announce@example.com"]
 
  92         self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
 
  94     def environ(self, sender, recipient):
 
  97             "RECIPIENT": recipient,
 
 100 class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):
 
 103         self.failUnlessRaises(eoc.UnknownList,
 
 104                               self.mlm.parse_recipient_address, 
 
 107     def verify(self, address, skip_prefix, forced_domain, wanted_dict):
 
 108         dict = self.mlm.parse_recipient_address(address, skip_prefix, 
 
 110         self.failUnlessEqual(dict, wanted_dict)
 
 112     def testSimpleAddresses(self):
 
 113         self.verify("foo@example.com", 
 
 116                     { "name": "foo@example.com", "command": "post" })
 
 117         self.verify("FOO@EXAMPLE.COM", 
 
 120                     { "name": "foo@example.com", "command": "post" })
 
 121         self.verify("prefix-foo@example.com", 
 
 124                     { "name": "foo@example.com", "command": "post" })
 
 125         self.verify("bar@example.com", 
 
 128                     { "name": "bar@lists.example.com", "command": "post" })
 
 129         self.verify("prefix-bar@example.com", 
 
 132                     { "name": "bar@lists.example.com", "command": "post" })
 
 134     def testSubscription(self):
 
 135         self.verify("foo-subscribe@example.com", 
 
 138                     { "name": "foo@example.com", 
 
 139                       "command": "subscribe",
 
 142         self.verify("foo-subscribe-joe-user=example.com@example.com", 
 
 145                     { "name": "foo@example.com", 
 
 146                       "command": "subscribe",
 
 147                       "sender": "joe-user@example.com",
 
 149         self.verify("foo-unsubscribe@example.com", 
 
 152                     { "name": "foo@example.com", 
 
 153                       "command": "unsubscribe",
 
 156         self.verify("foo-unsubscribe-joe-user=example.com@example.com", 
 
 159                     { "name": "foo@example.com", 
 
 160                       "command": "unsubscribe",
 
 161                       "sender": "joe-user@example.com",
 
 165         for name in self.lists:
 
 166             self.verify(name, None, None, { "name": name, "command": "post" })
 
 168     def testSimpleCommands(self):
 
 169         for name in self.lists:
 
 170             for command in ["help", "list", "owner"]:
 
 171                 localpart, domain = name.split("@")
 
 172                 address = "%s-%s@%s" % (localpart, command, domain)
 
 173                 self.verify(address, None, None,
 
 178 class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):
 
 180     def try_good_signature(self, command):
 
 181         s = "foo-announce-%s-1" % command
 
 182         hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
 
 183         local_part = "%s-%s" % (s, hash)
 
 184         dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
 
 186         self.failUnlessEqual(dict,
 
 188                                 "name": "foo-announce@example.com",
 
 193     def testProperlySignedCommands(self):
 
 194         self.try_good_signature("subyes")
 
 195         self.try_good_signature("subapprove")
 
 196         self.try_good_signature("subreject")
 
 197         self.try_good_signature("unsubyes")
 
 198         self.try_good_signature("bounce")
 
 199         self.try_good_signature("approve")
 
 200         self.try_good_signature("reject")
 
 201         self.try_good_signature("probe")
 
 203 class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):
 
 205     def try_bad_signature(self, command_part):
 
 206         self.failUnlessRaises(eoc.BadSignature,
 
 207                               self.mlm.parse_recipient_address, 
 
 208                               "foo-announce-" + command_part + 
 
 209                                     "-123-badhash@example.com",
 
 212     def testBadlySignedCommands(self):
 
 213         self.try_bad_signature("subyes")
 
 214         self.try_bad_signature("subapprove")
 
 215         self.try_bad_signature("subreject")
 
 216         self.try_bad_signature("unsubyes")
 
 217         self.try_bad_signature("bounce")
 
 218         self.try_bad_signature("approve")
 
 219         self.try_bad_signature("reject")
 
 220         self.try_bad_signature("probe")
 
 222 class DotDirTestCases(unittest.TestCase):
 
 225         self.secret_name = os.path.join(DOTDIR, "secret")
 
 228         shutil.rmtree(DOTDIR)
 
 230     def dotdir_is_ok(self):
 
 231         self.failUnless(os.path.isdir(DOTDIR))
 
 232         self.failUnless(os.path.isfile(self.secret_name))
 
 234     def testNoDotDirExists(self):
 
 235         self.failIf(os.path.exists(DOTDIR))
 
 236         mlm = eoc.MailingListManager(DOTDIR)
 
 239     def testDotDirDoesExistButSecretDoesNot(self):
 
 240         self.failIf(os.path.exists(DOTDIR))
 
 242         self.failUnless(os.path.isdir(DOTDIR))
 
 243         self.failIf(os.path.exists(self.secret_name))
 
 244         mlm = eoc.MailingListManager(DOTDIR)
 
 248 class RemoveSomeHeadersTest(unittest.TestCase):
 
 250     def testRemoveSomeHeaders(self):
 
 251         mlm = eoc.MailingListManager(DOTDIR)
 
 252         ml = eoc.MailingList(mlm, "list@example.com")
 
 254 Header-1: this is a simple header
 
 258     complex header with a colon: yes it is
 
 259 Header-3: odd numbered headers are simple
 
 263         mail2 = ml.remove_some_headers(mail, ["Header-2"])
 
 264         self.failUnlessEqual(mail2, """\
 
 265 Header-1: this is a simple header
 
 266 Header-3: odd numbered headers are simple
 
 271 class ListBase(unittest.TestCase):
 
 274         if os.path.exists(DOTDIR):
 
 275             shutil.rmtree(DOTDIR)
 
 276         self.mlm = eoc.MailingListManager(DOTDIR)
 
 280         shutil.rmtree(DOTDIR)
 
 282 class ListCreationTestCases(ListBase):
 
 288     def listdir(self, listname):
 
 289         return os.path.join(DOTDIR, listname)
 
 291     def listdir_has_file(self, listdir, filename):
 
 292         self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
 
 293         self.names.remove(filename)
 
 295     def listdir_has_dir(self, listdir, dirname):
 
 296         self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
 
 297         self.names.remove(dirname)
 
 299     def listdir_may_have_dir(self, listdir, dirname):
 
 300         if dirname in self.names:
 
 301             self.listdir_has_dir(listdir, dirname)
 
 303     def listdir_is_ok(self, listname):
 
 304         listdir = self.listdir(listname)
 
 305         self.failUnless(os.path.isdir(listdir))
 
 306         self.names = os.listdir(listdir)
 
 308         self.listdir_has_file(listdir, "config")
 
 309         self.listdir_has_file(listdir, "subscribers")
 
 311         self.listdir_has_dir(listdir, "bounce-box")
 
 312         self.listdir_has_dir(listdir, "subscription-box")
 
 314         self.listdir_may_have_dir(listdir, "moderation-box")
 
 315         self.listdir_may_have_dir(listdir, "templates")
 
 317         # Make sure there are no extras.
 
 318         self.failUnlessEqual(self.names, [])
 
 320     def testCreateNew(self):
 
 321         self.failIf(os.path.exists(self.listdir("foo@example.com")))
 
 322         ml = self.mlm.create_list("foo@example.com")
 
 324         self.failUnlessEqual(ml.__class__, eoc.MailingList)
 
 325         self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))
 
 327         self.listdir_is_ok("foo@example.com")
 
 329     def testCreateExisting(self):
 
 330         list = self.mlm.create_list("foo@example.com")
 
 331         self.failUnlessRaises(eoc.ListExists,
 
 332                               self.mlm.create_list, "foo@example.com")
 
 333         self.listdir_is_ok("foo@example.com")
 
 335 class ListOptionTestCases(ListBase):
 
 337     def check(self, ml, wanted):
 
 338         self.failUnlessEqual(ml.cp.sections(), ["list"])
 
 340         for key, value in ml.cp.items("list"):
 
 342         self.failUnlessEqual(cpdict, wanted)
 
 344     def testDefaultOptionsOnCreateAndOpenExisting(self):
 
 345         self.mlm.create_list("foo@example.com")
 
 346         ml = self.mlm.open_list("foo@example.com")
 
 351                       "subscription": "free",
 
 354                       "mail-on-subscription-changes": "no",
 
 355                       "mail-on-forced-unsubscribe": "no",
 
 356                       "ignore-bounce": "no",
 
 358                       "pristine-headers": "",
 
 359                       "subject-prefix": "",
 
 362     def testChangeOptions(self):
 
 363         # Create a list, change some options, and save the result.
 
 364         ml = self.mlm.create_list("foo@example.com")
 
 365         self.failUnlessEqual(ml.cp.get("list", "owners"), "")
 
 366         self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
 
 367         ml.cp.set("list", "owners", "owner@example.com")
 
 368         ml.cp.set("list", "posting", "moderated")
 
 371         # Re-open the list and check that the new instance has read the
 
 372         # values from the disk correctly.
 
 373         ml2 = self.mlm.open_list("foo@example.com")
 
 376                       "owners": "owner@example.com",
 
 378                       "subscription": "free",
 
 379                       "posting": "moderated",
 
 381                       "mail-on-subscription-changes": "no",
 
 382                       "mail-on-forced-unsubscribe": "no",
 
 383                       "ignore-bounce": "no",
 
 385                       "pristine-headers": "",
 
 386                       "subject-prefix": "",
 
 389 class SubscriberDatabaseTestCases(ListBase):
 
 391     def has_subscribers(self, ml, addrs):
 
 392         subs = ml.subscribers.get_all()
 
 394         self.failUnlessEqual(subs, addrs)
 
 396     def testAddAndRemoveSubscribers(self):
 
 397         addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
 
 400         ml = self.mlm.create_list("foo@example.com")
 
 401         self.failUnlessEqual(ml.subscribers.get_all(), [])
 
 403         self.failUnless(ml.subscribers.lock())
 
 404         ml.subscribers.add_many(addrs)
 
 405         self.has_subscribers(ml, addrs)
 
 406         ml.subscribers.save()
 
 407         self.failIf(ml.subscribers.locked)
 
 410         ml2 = self.mlm.open_list("foo@example.com")
 
 411         self.has_subscribers(ml2, addrs)
 
 412         ml2.subscribers.lock()
 
 413         ml2.subscribers.remove(addrs[0])
 
 414         self.has_subscribers(ml2, addrs[1:])
 
 416         ml2.subscribers.save()
 
 418         ml3 = self.mlm.open_list("foo@example.com")
 
 419         self.has_subscribers(ml3, addrs[1:])
 
 421     def testSubscribeTwice(self):
 
 422         ml = self.mlm.create_list("foo@example.com")
 
 423         self.failUnlessEqual(ml.subscribers.get_all(), [])
 
 424         ml.subscribers.lock()
 
 425         ml.subscribers.add("user@example.com")
 
 426         ml.subscribers.add("USER@example.com")
 
 427         self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
 
 428                              map(string.lower, ["USER@example.com"]))
 
 430     def testSubscriberAttributesAndGroups(self):
 
 431         addrs = ["joe@example.com", "mary@example.com"]
 
 433         ml = self.mlm.create_list("foo@example.com")
 
 434         self.failUnlessEqual(ml.subscribers.groups(), [])
 
 435         ml.subscribers.lock()
 
 436         id = ml.subscribers.add_many(addrs)
 
 437         self.failUnlessEqual(ml.subscribers.groups(), ["0"])
 
 438         self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
 
 439         ml.subscribers.set(id, "status", "bounced")
 
 440         self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
 
 441         subs = ml.subscribers.in_group(id)
 
 443         self.failUnlessEqual(subs, addrs)
 
 445     def testSubjectPrefix(self):
 
 446         ml = self.mlm.create_list("prefix@example.com")
 
 447         ml.cp.set("list", "subject-prefix", "[test]")
 
 450         self.failUnlessEqual(ml.cp.get("list", "subject-prefix"), "[test]")
 
 454 From: test2@example.com
 
 455 Subject: testing whether the subject prefix works
 
 460         prefixed_mail = ml.add_subject_prefix(mail)
 
 462         self.failUnlessEqual(prefixed_mail, """\
 
 464 From: test2@example.com
 
 465 Subject: [test] testing whether the subject prefix works
 
 472 class ModerationBoxTestCases(ListBase):
 
 474     def testModerationBox(self):
 
 475         ml = self.mlm.create_list("foo@example.com")
 
 476         listdir = os.path.join(DOTDIR, "foo@example.com")
 
 477         boxdir = os.path.join(listdir, "moderation-box")
 
 479         self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
 
 480         self.failUnless(os.path.isdir(boxdir))
 
 482         mailtext = "From: foo\nTo: bar\n\nhello\n"
 
 483         id = ml.moderation_box.add("foo", mailtext)
 
 484         self.failUnless(ml.moderation_box.has(id))
 
 485         self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
 
 486         self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
 
 488         filename = os.path.join(boxdir, id)
 
 489         self.failUnless(os.path.isfile(filename))
 
 490         self.failUnless(os.path.isfile(filename + ".address"))
 
 492         ml.moderation_box.remove(id)
 
 493         self.failIf(ml.moderation_box.has(id))
 
 494         self.failUnless(not os.path.exists(filename))
 
 496 class IncomingBase(unittest.TestCase):
 
 499         if os.path.isdir(DOTDIR):
 
 500             shutil.rmtree(DOTDIR)
 
 501         self.mlm = eoc.MailingListManager(DOTDIR)
 
 503         ml = self.mlm.create_list("foo@EXAMPLE.com")
 
 504         ml.cp.set("list", "owners", "listmaster@example.com")
 
 506         ml.subscribers.lock()
 
 507         ml.subscribers.add("USER1@example.com")
 
 508         ml.subscribers.add("user2@EXAMPLE.com")
 
 509         ml.subscribers.save()
 
 510         self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
 
 511         self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
 
 515         shutil.rmtree(DOTDIR)
 
 517     def write_file_in_listdir(self, ml, basename, contents):
 
 518         f = open(os.path.join(ml.dirname, basename), "w")
 
 522     def configure_list(self, subscription, posting):
 
 523         list = self.mlm.open_list("foo@example.com")
 
 524         list.cp.set("list", "subscription", subscription)
 
 525         list.cp.set("list", "posting", posting)
 
 528     def environ(self, sender, recipient):
 
 531             "RECIPIENT": recipient,
 
 534     def catch_sendmail(self, sender, recipients, text):
 
 535         self.sent_mail.append({
 
 537             "recipients": recipients,
 
 541     def send(self, sender, recipient, text="", force_moderation=0, 
 
 543         self.environ(sender, recipient)
 
 544         dict = self.mlm.parse_recipient_address(recipient, None, None)
 
 545         dict["force-moderation"] = force_moderation
 
 546         dict["force-posting"] = force_posting
 
 547         self.ml = self.mlm.open_list(dict["name"])
 
 548         if "\n\n" not in text:
 
 550         text = "Received: foobar\n" + text
 
 551         self.ml.read_stdin = lambda t=text: t
 
 552         self.mlm.send_mail = self.catch_sendmail
 
 556     def sender_matches(self, mail, sender):
 
 557         pat = "(?P<address>" + sender + ")"
 
 558         m = re.match(pat, mail["sender"], re.I)
 
 560             return m.group("address")
 
 564     def replyto_matches(self, mail, replyto):
 
 565         pat = "(.|\n)*(?P<address>" + replyto + ")"
 
 566         m = re.match(pat, mail["text"], re.I)
 
 568             return m.group("address")
 
 572     def receiver_matches(self, mail, recipient):
 
 573         return map(string.lower, mail["recipients"]) == [recipient.lower()]
 
 575     def body_matches(self, mail, body):
 
 577             pat = re.compile("(.|\n)*" + body + "(.|\n)*")
 
 578             m = re.match(pat, mail["text"])
 
 583     def headers_match(self, mail, header):
 
 585             pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
 
 586             m = re.match(pat, mail["text"])
 
 591     def match(self, sender, replyto, receiver, body=None, header=None,
 
 594         for mail in self.sent_mail:
 
 596                 m1 = self.sender_matches(mail, sender)
 
 597                 m3 = self.receiver_matches(mail, receiver)
 
 598                 m4 = self.body_matches(mail, body)
 
 599                 m5 = self.headers_match(mail, header)
 
 600                 m6 = self.headers_match(mail, anti_header)
 
 601                 no_anti_header = anti_header == None or m6 == None
 
 602                 if m1 != None and m3 and m4 and m5 and no_anti_header:
 
 604                     self.sent_mail.remove(mail)
 
 607                 m1 = self.sender_matches(mail, sender)
 
 608                 m2 = self.replyto_matches(mail, replyto)
 
 609                 m3 = self.receiver_matches(mail, receiver)
 
 610                 m4 = self.body_matches(mail, body)
 
 611                 m5 = self.headers_match(mail, header)
 
 612                 m6 = self.headers_match(mail, anti_header)
 
 613                 no_anti_header = anti_header == None or m6 == None
 
 614                 if m1 != None and m2 != None and m3 and m4 and m5 and \
 
 617                     self.sent_mail.remove(mail)
 
 619         self.failUnless(ret != None)
 
 622     def no_more_mail(self):
 
 623         self.failUnlessEqual(self.sent_mail, [])
 
 626 class SimpleCommandAddressTestCases(IncomingBase):
 
 629         self.send("outsider@example.com", "foo-help@example.com")
 
 630         self.match("foo-ignore@example.com", None, "outsider@example.com", 
 
 635         self.send("outsider@example.com", "foo-owner@example.com", "abcde")
 
 636         self.match("outsider@example.com", None, "listmaster@example.com",
 
 640     def testIgnore(self):
 
 641         self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
 
 644 class OwnerCommandTestCases(IncomingBase):
 
 647         self.send("listmaster@example.com", "foo-list@example.com")
 
 648         self.match("foo-ignore@example.com", None, "listmaster@example.com",
 
 649                    "[uU][sS][eE][rR][12]@" +
 
 650                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
 
 651                    "[uU][sS][eE][rR][12]@" +
 
 652                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
 
 655     def testListDenied(self):
 
 656         self.send("outsider@example.com", "foo-list@example.com")
 
 657         self.match("foo-ignore@example.com", None, "outsider@example.com", 
 
 658                    "Subject: Subscriber list denied")
 
 661     def testSetlist(self):
 
 662         self.send("listmaster@example.com", "foo-setlist@example.com",
 
 663                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
 
 664         a = self.match("foo-ignore@example.com", 
 
 665                        "foo-setlistyes-[^@]*@example.com", 
 
 666                        "listmaster@example.com", 
 
 667                        "Subject: Please moderate subscriber list")
 
 670         self.send("listmaster@example.com", a)
 
 671         self.match("foo-ignore@example.com", None, "listmaster@example.com",
 
 672                    "Subject: Subscriber list has been changed")
 
 673         self.match("foo-ignore@example.com", None, "new1@example.com",
 
 674                    "Subject: Welcome to")
 
 675         self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
 
 676                    "Subject: Goodbye from")
 
 679     def testSetlistSilently(self):
 
 680         self.send("listmaster@example.com", "foo-setlistsilently@example.com",
 
 681                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
 
 682         a = self.match("foo-ignore@example.com", 
 
 683                        "foo-setlistsilentyes-[^@]*@example.com", 
 
 684                        "listmaster@example.com", 
 
 685                        "Subject: Please moderate subscriber list")
 
 688         self.send("listmaster@example.com", a)
 
 689         self.match("foo-ignore@example.com", None, "listmaster@example.com",
 
 690                    "Subject: Subscriber list has been changed")
 
 693     def testSetlistDenied(self):
 
 694         self.send("outsider@example.com", "foo-setlist@example.com",
 
 695                   "From: foo\n\nnew1@example.com\nnew2@example.com\n")
 
 696         self.match("foo-ignore@example.com", 
 
 698                    "outsider@example.com", 
 
 699                    "Subject: You can't set the subscriber list")
 
 702     def testSetlistBadlist(self):
 
 703         self.send("listmaster@example.com", "foo-setlist@example.com",
 
 704                   "From: foo\n\nBlah blah blah.\n")
 
 705         self.match("foo-ignore@example.com", 
 
 707                    "listmaster@example.com", 
 
 708                    "Subject: Bad address list")
 
 711     def testOwnerSubscribesSomeoneElse(self):
 
 712         # Send subscription request. List sends confirmation request.
 
 713         self.send("listmaster@example.com",
 
 714                   "foo-subscribe-outsider=example.com@example.com")
 
 715         a = self.match("foo-ignore@example.com", 
 
 716                        "foo-subyes-[^@]*@example.com", 
 
 717                        "listmaster@example.com",
 
 718                        "Please confirm subscription")
 
 721         # Confirm sub. req. List sends welcome.
 
 722         self.send("listmaster@example.com", a)
 
 723         self.match("foo-ignore@example.com", 
 
 725                    "outsider@example.com", 
 
 729     def testOwnerUnubscribesSomeoneElse(self):
 
 730         # Send unsubscription request. List sends confirmation request.
 
 731         self.send("listmaster@example.com",
 
 732                   "foo-unsubscribe-outsider=example.com@example.com")
 
 733         a = self.match("foo-ignore@example.com", 
 
 734                        "foo-unsubyes-[^@]*@example.com", 
 
 735                        "listmaster@example.com",
 
 736                        "Subject: Please confirm UNsubscription")
 
 739         # Confirm sub. req. List sends welcome.
 
 740         self.send("listmaster@example.com", a)
 
 741         self.match("foo-ignore@example.com", None, "outsider@example.com", 
 
 745 class SubscriptionTestCases(IncomingBase):
 
 747     def confirm(self, recipient):
 
 748         # List has sent confirmation request. Respond to it.
 
 749         a = self.match("foo-ignore@example.com", 
 
 750                        "foo-subyes-[^@]*@example.com", 
 
 752                        "Please confirm subscription")
 
 755         # Confirm sub. req. List response will be analyzed later.
 
 756         self.send("something.random@example.com", a)
 
 758     def got_welcome(self, recipient):
 
 759         self.match("foo-ignore@example.com", 
 
 765     def approve(self, user_recipient):
 
 766         self.match("foo-ignore@example.com", None, user_recipient)
 
 767         a = self.match("foo-ignore@example.com", 
 
 768                        "foo-subapprove-[^@]*@example.com",
 
 769                        "listmaster@example.com")
 
 770         self.send("listmaster@example.com", a)
 
 772     def reject(self, user_recipient):
 
 773         self.match("foo-ignore@example.com", None, user_recipient)
 
 774         a = self.match("foo-ignore@example.com", 
 
 775                        "foo-subreject-[^@]*@example.com",
 
 776                        "listmaster@example.com")
 
 777         self.send("listmaster@example.com", a)
 
 779     def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
 
 780         self.configure_list("free", "free")
 
 781         self.send("outsider@example.com", "foo-subscribe@example.com")
 
 782         self.confirm("outsider@example.com")
 
 783         self.got_welcome("outsider@example.com")
 
 785     def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
 
 786         self.configure_list("free", "free")
 
 787         self.send("user1@example.com", "foo-subscribe@example.com")
 
 788         self.confirm("user1@example.com")
 
 789         self.got_welcome("user1@example.com")
 
 791     def testSubscribeToUnmoderatedWithAddressNotOnList(self):
 
 792         self.configure_list("free", "free")
 
 793         self.send("somebody.else@example.com", 
 
 794                   "foo-subscribe-outsider=example.com@example.com")
 
 795         self.confirm("outsider@example.com")
 
 796         self.got_welcome("outsider@example.com")
 
 798     def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
 
 799         self.configure_list("free", "free")
 
 800         self.send("somebody.else@example.com", 
 
 801                   "foo-subscribe-user1=example.com@example.com")
 
 802         self.confirm("user1@example.com")
 
 803         self.got_welcome("user1@example.com")
 
 805     def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
 
 806         self.configure_list("moderated", "moderated")
 
 807         self.send("outsider@example.com", "foo-subscribe@example.com")
 
 808         self.confirm("outsider@example.com")
 
 809         self.approve("outsider@example.com")
 
 810         self.got_welcome("outsider@example.com")
 
 812     def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
 
 813         self.configure_list("moderated", "moderated")
 
 814         self.send("outsider@example.com", "foo-subscribe@example.com")
 
 815         self.confirm("outsider@example.com")
 
 816         self.reject("outsider@example.com")
 
 818     def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
 
 819         self.configure_list("moderated", "moderated")
 
 820         self.send("user1@example.com", "foo-subscribe@example.com")
 
 821         self.confirm("user1@example.com")
 
 822         self.approve("user1@example.com")
 
 823         self.got_welcome("user1@example.com")
 
 825     def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
 
 826         self.configure_list("moderated", "moderated")
 
 827         self.send("user1@example.com", "foo-subscribe@example.com")
 
 828         self.confirm("user1@example.com")
 
 829         self.reject("user1@example.com")
 
 831     def testSubscribeToModeratedWithAddressNotOnListApproved(self):
 
 832         self.configure_list("moderated", "moderated")
 
 833         self.send("somebody.else@example.com", 
 
 834                   "foo-subscribe-outsider=example.com@example.com")
 
 835         self.confirm("outsider@example.com")
 
 836         self.approve("outsider@example.com")
 
 837         self.got_welcome("outsider@example.com")
 
 839     def testSubscribeToModeratedWithAddressNotOnListRejected(self):
 
 840         self.configure_list("moderated", "moderated")
 
 841         self.send("somebody.else@example.com", 
 
 842                   "foo-subscribe-outsider=example.com@example.com")
 
 843         self.confirm("outsider@example.com")
 
 844         self.reject("outsider@example.com")
 
 846     def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
 
 847         self.configure_list("moderated", "moderated")
 
 848         self.send("somebody.else@example.com", 
 
 849                   "foo-subscribe-user1=example.com@example.com")
 
 850         self.confirm("user1@example.com")
 
 851         self.approve("user1@example.com")
 
 852         self.got_welcome("user1@example.com")
 
 854     def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
 
 855         self.configure_list("moderated", "moderated")
 
 856         self.send("somebody.else@example.com", 
 
 857                   "foo-subscribe-user1=example.com@example.com")
 
 858         self.confirm("user1@example.com")
 
 859         self.reject("user1@example.com")
 
 861 class UnsubscriptionTestCases(IncomingBase):
 
 863     def confirm(self, recipient):
 
 864         # List has sent confirmation request. Respond to it.
 
 865         a = self.match("foo-ignore@example.com", 
 
 866                        "foo-unsubyes-[^@]*@example.com", 
 
 868                        "Please confirm UNsubscription")
 
 871         # Confirm sub. req. List response will be analyzed later.
 
 872         self.send("something.random@example.com", a)
 
 874     def got_goodbye(self, recipient):
 
 875         self.match("foo-ignore@example.com", 
 
 881     def testUnsubscribeWithoutAddressNotOnList(self):
 
 882         self.send("outsider@example.com", "foo-unsubscribe@example.com")
 
 883         self.confirm("outsider@example.com")
 
 884         self.got_goodbye("outsider@example.com")
 
 886     def testUnsubscribeWithoutAddressOnList(self):
 
 887         self.send("user1@example.com", "foo-unsubscribe@example.com")
 
 888         self.confirm("user1@example.com")
 
 889         self.got_goodbye("user1@example.com")
 
 891     def testUnsubscribeWithAddressNotOnList(self):
 
 892         self.send("somebody.else@example.com", 
 
 893                   "foo-unsubscribe-outsider=example.com@example.com")
 
 894         self.confirm("outsider@example.com")
 
 895         self.got_goodbye("outsider@example.com")
 
 897     def testUnsubscribeWithAddressOnList(self):
 
 898         self.send("somebody.else@example.com", 
 
 899                   "foo-unsubscribe-user1=example.com@example.com")
 
 900         self.confirm("user1@example.com")
 
 901         self.got_goodbye("user1@example.com")
 
 903 class PostTestCases(IncomingBase):
 
 905     msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")
 
 907     def approve(self, user_recipient):
 
 908         self.match("foo-ignore@example.com", None, user_recipient)
 
 909         a = self.match("foo-ignore@example.com", 
 
 910                        "foo-approve-[^@]*@example.com",
 
 911                        "listmaster@example.com")
 
 912         self.send("listmaster@example.com", a)
 
 914     def reject(self, user_recipient):
 
 915         self.match("foo-ignore@example.com", None, user_recipient)
 
 916         a = self.match("foo-ignore@example.com", 
 
 917                        "foo-reject-[^@]*@example.com",
 
 918                        "listmaster@example.com")
 
 919         self.send("listmaster@example.com", a)
 
 921     def check_headers_are_encoded(self):
 
 923         for code in range(32, 127):
 
 924             ok_chars = ok_chars + chr(code)
 
 925         for mail in self.sent_mail:
 
 927             self.failUnless("\n\n" in text)
 
 928             headers = text.split("\n\n")[0]
 
 930                 if c not in ok_chars: print headers
 
 931                 self.failUnless(c in ok_chars)
 
 933     def check_mail_to_list(self):
 
 934         self.check_headers_are_encoded()
 
 935         self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
 
 938                    anti_header="Received:")
 
 939         self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
 
 942                    anti_header="Received:")
 
 945     def check_that_moderation_box_is_empty(self):
 
 946         ml = self.mlm.open_list("foo@example.com")
 
 947         self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])
 
 949     def testSubscriberPostsToUnmoderated(self):
 
 950         self.configure_list("free", "free")
 
 951         self.send("user1@example.com", "foo@example.com", 
 
 953         self.check_mail_to_list()
 
 955     def testOutsiderPostsToUnmoderated(self):
 
 956         self.configure_list("free", "free")
 
 957         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 958         self.check_mail_to_list()
 
 960     def testSubscriberPostToAutomoderated(self):
 
 961         self.configure_list("free", "auto")
 
 962         self.check_that_moderation_box_is_empty()
 
 963         self.send("user1@example.com", "foo@example.com", self.msg)
 
 964         self.check_mail_to_list()
 
 965         self.check_that_moderation_box_is_empty()
 
 967     def testOutsiderPostsToAutomoderatedRejected(self):
 
 968         self.configure_list("free", "auto")
 
 969         self.check_that_moderation_box_is_empty()
 
 970         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 971         self.reject("outsider@example.com")
 
 972         self.check_that_moderation_box_is_empty()
 
 974     def testOutsiderPostsToAutomoderatedApproved(self):
 
 975         self.configure_list("free", "auto")
 
 976         self.check_that_moderation_box_is_empty()
 
 977         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 978         self.approve("outsider@example.com")
 
 979         self.check_mail_to_list()
 
 980         self.check_that_moderation_box_is_empty()
 
 982     def testSubscriberPostsToModeratedRejected(self):
 
 983         self.configure_list("free", "moderated")
 
 984         self.check_that_moderation_box_is_empty()
 
 985         self.send("user1@example.com", "foo@example.com", self.msg)
 
 986         self.reject("user1@example.com")
 
 987         self.check_that_moderation_box_is_empty()
 
 989     def testOutsiderPostsToMderatedApproved(self):
 
 990         self.configure_list("free", "moderated")
 
 991         self.check_that_moderation_box_is_empty()
 
 992         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 993         self.approve("outsider@example.com")
 
 994         self.check_mail_to_list()
 
 995         self.check_that_moderation_box_is_empty()
 
 997     def testSubscriberPostsWithRequestToBeModerated(self):
 
 998         self.configure_list("free", "free")
 
1000         self.check_that_moderation_box_is_empty()
 
1001         self.send("user1@example.com", "foo@example.com", self.msg,
 
1003         self.match("foo-ignore@example.com", 
 
1005                    "user1@example.com", 
 
1006                    "Subject: Please wait")
 
1007         a = self.match("foo-ignore@example.com", 
 
1008                        "foo-approve-[^@]*@example.com", 
 
1009                        "listmaster@example.com")
 
1012         self.send("listmaster@example.com", a)
 
1013         self.check_mail_to_list()
 
1014         self.check_that_moderation_box_is_empty()
 
1016     def testSubscriberPostsWithModerationOverride(self):
 
1017         self.configure_list("moderated", "moderated")
 
1018         self.send("user1@example.com", "foo@example.com", self.msg,
 
1020         self.check_mail_to_list()
 
1021         self.check_that_moderation_box_is_empty()
 
1023 class BounceTestCases(IncomingBase):
 
1025     def check_subscriber_status(self, must_be):
 
1026         ml = self.mlm.open_list("foo@example.com")
 
1027         for id in ml.subscribers.groups():
 
1028             self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)
 
1030     def bounce_sent_mail(self):
 
1031         for m in self.sent_mail[:]:
 
1032             self.send("something@example.com", m["sender"], "eek")
 
1033             self.failUnlessEqual(len(self.sent_mail), 0)
 
1035     def send_mail_to_list_then_bounce_everything(self):
 
1036         self.send("user@example.com", "foo@example.com", "hello")
 
1037         for m in self.sent_mail[:]:
 
1038             self.send("foo@example.com", m["sender"], "eek")
 
1039             self.failUnlessEqual(len(self.sent_mail), 0)
 
1041     def testBounceOnceThenRecover(self):
 
1042         self.check_subscriber_status("ok")
 
1043         self.send_mail_to_list_then_bounce_everything()
 
1045         self.check_subscriber_status("bounced")
 
1047         ml = self.mlm.open_list("foo@example.com")
 
1048         for id in ml.subscribers.groups():
 
1049             bounce_id = ml.subscribers.get(id, "bounce-id")
 
1050             self.failUnless(bounce_id)
 
1051             self.failUnless(ml.bounce_box.has(bounce_id))
 
1055         ml = self.mlm.open_list("foo@example.com")
 
1056         ml.subscribers.lock()
 
1057         for id in ml.subscribers.groups():
 
1058             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
 
1059             self.failUnless(abs(timestamp - now) < 10.0)
 
1060             ml.subscribers.set(id, "timestamp-bounced", "69.0")
 
1061             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
 
1062         ml.subscribers.save()
 
1064         self.mlm.cleaning_woman(no_op)
 
1065         self.check_subscriber_status("probed")
 
1067         for bounce_id in bounce_ids:
 
1068             self.failUnless(ml.bounce_box.has(bounce_id))
 
1070         self.mlm.cleaning_woman(no_op)
 
1071         ml = self.mlm.open_list("foo@example.com")
 
1072         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1073         self.check_subscriber_status("ok")
 
1074         for bounce_id in bounce_ids:
 
1075             self.failUnless(not ml.bounce_box.has(bounce_id))
 
1077     def testBounceProbeAlso(self):
 
1078         self.check_subscriber_status("ok")
 
1079         self.send_mail_to_list_then_bounce_everything()
 
1080         self.check_subscriber_status("bounced")
 
1082         ml = self.mlm.open_list("foo@example.com")
 
1083         for id in ml.subscribers.groups():
 
1084             bounce_id = ml.subscribers.get(id, "bounce-id")
 
1085             self.failUnless(bounce_id)
 
1086             self.failUnless(ml.bounce_box.has(bounce_id))
 
1090         ml = self.mlm.open_list("foo@example.com")
 
1091         ml.subscribers.lock()
 
1092         for id in ml.subscribers.groups():
 
1093             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
 
1094             self.failUnless(abs(timestamp - now) < 10.0)
 
1095             ml.subscribers.set(id, "timestamp-bounced", "69.0")
 
1096             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
 
1097         ml.subscribers.save()
 
1100         self.mlm.cleaning_woman(self.catch_sendmail)
 
1101         self.check_subscriber_status("probed")
 
1102         for bounce_id in bounce_ids:
 
1103             self.failUnless(ml.bounce_box.has(bounce_id))
 
1104         self.bounce_sent_mail()
 
1105         self.check_subscriber_status("probebounced")
 
1107         self.mlm.cleaning_woman(no_op)
 
1108         ml = self.mlm.open_list("foo@example.com")
 
1109         self.failUnlessEqual(len(ml.subscribers.groups()), 0)
 
1110         for bounce_id in bounce_ids:
 
1111             self.failUnless(not ml.bounce_box.has(bounce_id))
 
1113     def testCleaningWomanJoinsAndBounceSplitsGroups(self):
 
1114         # Check that each group contains one address and set the creation
 
1115         # timestamp to an ancient time.
 
1116         ml = self.mlm.open_list("foo@example.com")
 
1117         bouncedir = os.path.join(ml.dirname, "bounce-box")
 
1118         ml.subscribers.lock()
 
1119         for id in ml.subscribers.groups():
 
1120             addrs = ml.subscribers.in_group(id)
 
1121             self.failUnlessEqual(len(addrs), 1)
 
1122             bounce_id = ml.subscribers.get(id, "bounce-id")
 
1123             self.failUnlessEqual(bounce_id, "..notexist..")
 
1124             bounce_id = "bounce-" + id
 
1125             ml.subscribers.set(id, "bounce-id", bounce_id)
 
1126             bounce_path = os.path.join(bouncedir, bounce_id)
 
1127             self.failUnless(not os.path.isfile(bounce_path))
 
1128             f = open(bounce_path, "w")
 
1130             f = open(bounce_path + ".address", "w")
 
1132             self.failUnless(os.path.isfile(bounce_path))
 
1133             ml.subscribers.set(id, "timestamp-created", "1")
 
1134         ml.subscribers.save()
 
1136         # Check that --cleaning-woman joins the two groups into one.
 
1137         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1138         self.mlm.cleaning_woman(no_op)
 
1139         ml = self.mlm.open_list("foo@example.com")
 
1140         self.failUnlessEqual(len(ml.subscribers.groups()), 1)
 
1141         self.failUnlessEqual(os.listdir(bouncedir), [])
 
1143         # Check that a bounce splits the single group.
 
1144         self.send_mail_to_list_then_bounce_everything()
 
1145         ml = self.mlm.open_list("foo@example.com")
 
1146         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1148         # Check that a --cleaning-woman immediately after doesn't join.
 
1149         # (The groups are new, thus shouldn't be joined for a week.)
 
1150         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1151         self.mlm.cleaning_woman(no_op)
 
1152         ml = self.mlm.open_list("foo@example.com")
 
1153         self.failUnlessEqual(len(ml.subscribers.groups()), 2)