10 DOTDIR = "dot-dir-for-testing"
 
  17 class AddressCleaningTestCases(unittest.TestCase):
 
  20         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
 
  21                                      "bar@lists.example.com"])
 
  23     def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
 
  24         self.ap.set_skip_prefix(skip_prefix)
 
  25         self.ap.set_forced_domain(forced_domain)
 
  26         address = self.ap.clean(address)
 
  27         self.failUnlessEqual(address, wanted)
 
  29     def testSimpleAddress(self):
 
  30         self.verify("foo@example.com", "foo@example.com")
 
  32     def testUpperCaseAddress(self):
 
  33         self.verify("FOO@EXAMPLE.COM", "foo@example.com")
 
  35     def testPrefixRemoval(self):
 
  36         self.verify("foo@example.com", "foo@example.com", 
 
  38         self.verify("user-foo@example.com", "foo@example.com", 
 
  41     def testForcedDomain(self):
 
  42         self.verify("foo@example.com", "foo@example.com",
 
  43                     forced_domain="example.com")
 
  44         self.verify("foo@whatever.example.com", "foo@example.com", 
 
  45                     forced_domain="example.com")
 
  47     def testPrefixRemovalWithForcedDomain(self):
 
  48         self.verify("foo@example.com", "foo@example.com", 
 
  50                     forced_domain="example.com")
 
  51         self.verify("foo@whatever.example.com", "foo@example.com", 
 
  53                     forced_domain="example.com")
 
  54         self.verify("user-foo@example.com", "foo@example.com", 
 
  56                     forced_domain="example.com")
 
  57         self.verify("user-foo@whatever.example.com", "foo@example.com", 
 
  59                     forced_domain="example.com")
 
  61 class AddressParserTestCases(unittest.TestCase):
 
  64         self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
 
  65                                      "bar@lists.example.com"])
 
  67     def verify_parser(self, address, wanted_listname, wanted_parts):
 
  68         listname, parts = self.ap.parse(address)
 
  69         self.failUnlessEqual(listname, wanted_listname)
 
  70         self.failUnlessEqual(parts, wanted_parts)
 
  73         self.verify_parser("foo@example.com", 
 
  76         self.verify_parser("foo-subscribe@example.com", 
 
  79         self.verify_parser("foo-subscribe-joe=example.com@example.com", 
 
  81                            ["subscribe", "joe=example.com"])
 
  82         self.verify_parser("foo-bounce-123-ABCDEF@example.com", 
 
  84                            ["bounce", "123", "abcdef"])
 
  86 class ParseRecipientAddressBase(unittest.TestCase):
 
  89         self.lists = ["foo@example.com", 
 
  90                       "bar@lists.example.com",
 
  91                       "foo-announce@example.com"]
 
  92         self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
 
  94     def environ(self, sender, recipient):
 
  97             "RECIPIENT": recipient,
 
 100 class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):
 
 103         self.failUnlessRaises(eoc.UnknownList,
 
 104                               self.mlm.parse_recipient_address, 
 
 107     def verify(self, address, skip_prefix, forced_domain, wanted_dict):
 
 108         dict = self.mlm.parse_recipient_address(address, skip_prefix, 
 
 110         self.failUnlessEqual(dict, wanted_dict)
 
 112     def testSimpleAddresses(self):
 
 113         self.verify("foo@example.com", 
 
 116                     { "name": "foo@example.com", "command": "post" })
 
 117         self.verify("FOO@EXAMPLE.COM", 
 
 120                     { "name": "foo@example.com", "command": "post" })
 
 121         self.verify("prefix-foo@example.com", 
 
 124                     { "name": "foo@example.com", "command": "post" })
 
 125         self.verify("bar@example.com", 
 
 128                     { "name": "bar@lists.example.com", "command": "post" })
 
 129         self.verify("prefix-bar@example.com", 
 
 132                     { "name": "bar@lists.example.com", "command": "post" })
 
 134     def testSubscription(self):
 
 135         self.verify("foo-subscribe@example.com", 
 
 138                     { "name": "foo@example.com", 
 
 139                       "command": "subscribe",
 
 142         self.verify("foo-subscribe-joe-user=example.com@example.com", 
 
 145                     { "name": "foo@example.com", 
 
 146                       "command": "subscribe",
 
 147                       "sender": "joe-user@example.com",
 
 149         self.verify("foo-unsubscribe@example.com", 
 
 152                     { "name": "foo@example.com", 
 
 153                       "command": "unsubscribe",
 
 156         self.verify("foo-unsubscribe-joe-user=example.com@example.com", 
 
 159                     { "name": "foo@example.com", 
 
 160                       "command": "unsubscribe",
 
 161                       "sender": "joe-user@example.com",
 
 165         for name in self.lists:
 
 166             self.verify(name, None, None, { "name": name, "command": "post" })
 
 168     def testSimpleCommands(self):
 
 169         for name in self.lists:
 
 170             for command in ["help", "list", "owner"]:
 
 171                 localpart, domain = name.split("@")
 
 172                 address = "%s-%s@%s" % (localpart, command, domain)
 
 173                 self.verify(address, None, None,
 
 178 class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):
 
 180     def try_good_signature(self, command):
 
 181         s = "foo-announce-%s-1" % command
 
 182         hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
 
 183         local_part = "%s-%s" % (s, hash)
 
 184         dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
 
 186         self.failUnlessEqual(dict,
 
 188                                 "name": "foo-announce@example.com",
 
 193     def testProperlySignedCommands(self):
 
 194         self.try_good_signature("subyes")
 
 195         self.try_good_signature("subapprove")
 
 196         self.try_good_signature("subreject")
 
 197         self.try_good_signature("unsubyes")
 
 198         self.try_good_signature("bounce")
 
 199         self.try_good_signature("approve")
 
 200         self.try_good_signature("reject")
 
 201         self.try_good_signature("probe")
 
 203 class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):
 
 205     def try_bad_signature(self, command_part):
 
 206         self.failUnlessRaises(eoc.BadSignature,
 
 207                               self.mlm.parse_recipient_address, 
 
 208                               "foo-announce-" + command_part + 
 
 209                                     "-123-badhash@example.com",
 
 212     def testBadlySignedCommands(self):
 
 213         self.try_bad_signature("subyes")
 
 214         self.try_bad_signature("subapprove")
 
 215         self.try_bad_signature("subreject")
 
 216         self.try_bad_signature("unsubyes")
 
 217         self.try_bad_signature("bounce")
 
 218         self.try_bad_signature("approve")
 
 219         self.try_bad_signature("reject")
 
 220         self.try_bad_signature("probe")
 
 222 class DotDirTestCases(unittest.TestCase):
 
 225         self.secret_name = os.path.join(DOTDIR, "secret")
 
 228         shutil.rmtree(DOTDIR)
 
 230     def dotdir_is_ok(self):
 
 231         self.failUnless(os.path.isdir(DOTDIR))
 
 232         self.failUnless(os.path.isfile(self.secret_name))
 
 234     def testNoDotDirExists(self):
 
 235         self.failIf(os.path.exists(DOTDIR))
 
 236         mlm = eoc.MailingListManager(DOTDIR)
 
 239     def testDotDirDoesExistButSecretDoesNot(self):
 
 240         self.failIf(os.path.exists(DOTDIR))
 
 242         self.failUnless(os.path.isdir(DOTDIR))
 
 243         self.failIf(os.path.exists(self.secret_name))
 
 244         mlm = eoc.MailingListManager(DOTDIR)
 
 247 class ListBase(unittest.TestCase):
 
 250         if os.path.exists(DOTDIR):
 
 251             shutil.rmtree(DOTDIR)
 
 252         self.mlm = eoc.MailingListManager(DOTDIR)
 
 256         shutil.rmtree(DOTDIR)
 
 258 class ListCreationTestCases(ListBase):
 
 264     def listdir(self, listname):
 
 265         return os.path.join(DOTDIR, listname)
 
 267     def listdir_has_file(self, listdir, filename):
 
 268         self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
 
 269         self.names.remove(filename)
 
 271     def listdir_has_dir(self, listdir, dirname):
 
 272         self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
 
 273         self.names.remove(dirname)
 
 275     def listdir_may_have_dir(self, listdir, dirname):
 
 276         if dirname in self.names:
 
 277             self.listdir_has_dir(listdir, dirname)
 
 279     def listdir_is_ok(self, listname):
 
 280         listdir = self.listdir(listname)
 
 281         self.failUnless(os.path.isdir(listdir))
 
 282         self.names = os.listdir(listdir)
 
 284         self.listdir_has_file(listdir, "config")
 
 285         self.listdir_has_file(listdir, "subscribers")
 
 287         self.listdir_has_dir(listdir, "bounce-box")
 
 288         self.listdir_has_dir(listdir, "subscription-box")
 
 290         self.listdir_may_have_dir(listdir, "moderation-box")
 
 291         self.listdir_may_have_dir(listdir, "templates")
 
 293         # Make sure there are no extras.
 
 294         self.failUnlessEqual(self.names, [])
 
 296     def testCreateNew(self):
 
 297         self.failIf(os.path.exists(self.listdir("foo@example.com")))
 
 298         ml = self.mlm.create_list("foo@example.com")
 
 300         self.failUnlessEqual(ml.__class__, eoc.MailingList)
 
 301         self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))
 
 303         self.listdir_is_ok("foo@example.com")
 
 305     def testCreateExisting(self):
 
 306         list = self.mlm.create_list("foo@example.com")
 
 307         self.failUnlessRaises(eoc.ListExists,
 
 308                               self.mlm.create_list, "foo@example.com")
 
 309         self.listdir_is_ok("foo@example.com")
 
 311 class ListOptionTestCases(ListBase):
 
 313     def check(self, ml, wanted):
 
 314         self.failUnlessEqual(ml.cp.sections(), ["list"])
 
 316         for key, value in ml.cp.items("list"):
 
 318         self.failUnlessEqual(cpdict, wanted)
 
 320     def testDefaultOptionsOnCreateAndOpenExisting(self):
 
 321         self.mlm.create_list("foo@example.com")
 
 322         ml = self.mlm.open_list("foo@example.com")
 
 327                       "subscription": "free",
 
 330                       "mail-on-subscription-changes": "no",
 
 331                       "mail-on-forced-unsubscribe": "no",
 
 332                       "ignore-bounce": "no",
 
 334                       "pristine-headers": "",
 
 337     def testChangeOptions(self):
 
 338         # Create a list, change some options, and save the result.
 
 339         ml = self.mlm.create_list("foo@example.com")
 
 340         self.failUnlessEqual(ml.cp.get("list", "owners"), "")
 
 341         self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
 
 342         ml.cp.set("list", "owners", "owner@example.com")
 
 343         ml.cp.set("list", "posting", "moderated")
 
 346         # Re-open the list and check that the new instance has read the
 
 347         # values from the disk correctly.
 
 348         ml2 = self.mlm.open_list("foo@example.com")
 
 351                       "owners": "owner@example.com",
 
 353                       "subscription": "free",
 
 354                       "posting": "moderated",
 
 356                       "mail-on-subscription-changes": "no",
 
 357                       "mail-on-forced-unsubscribe": "no",
 
 358                       "ignore-bounce": "no",
 
 360                       "pristine-headers": "",
 
 363 class SubscriberDatabaseTestCases(ListBase):
 
 365     def has_subscribers(self, ml, addrs):
 
 366         subs = ml.subscribers.get_all()
 
 368         self.failUnlessEqual(subs, addrs)
 
 370     def testAddAndRemoveSubscribers(self):
 
 371         addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
 
 374         ml = self.mlm.create_list("foo@example.com")
 
 375         self.failUnlessEqual(ml.subscribers.get_all(), [])
 
 377         self.failUnless(ml.subscribers.lock())
 
 378         ml.subscribers.add_many(addrs)
 
 379         self.has_subscribers(ml, addrs)
 
 380         ml.subscribers.save()
 
 381         self.failIf(ml.subscribers.locked)
 
 384         ml2 = self.mlm.open_list("foo@example.com")
 
 385         self.has_subscribers(ml2, addrs)
 
 386         ml2.subscribers.lock()
 
 387         ml2.subscribers.remove(addrs[0])
 
 388         self.has_subscribers(ml2, addrs[1:])
 
 390         ml2.subscribers.save()
 
 392         ml3 = self.mlm.open_list("foo@example.com")
 
 393         self.has_subscribers(ml3, addrs[1:])
 
 395     def testSubscribeTwice(self):
 
 396         ml = self.mlm.create_list("foo@example.com")
 
 397         self.failUnlessEqual(ml.subscribers.get_all(), [])
 
 398         ml.subscribers.lock()
 
 399         ml.subscribers.add("user@example.com")
 
 400         ml.subscribers.add("USER@example.com")
 
 401         self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
 
 402                              map(string.lower, ["USER@example.com"]))
 
 404     def testSubscriberAttributesAndGroups(self):
 
 405         addrs = ["joe@example.com", "mary@example.com"]
 
 407         ml = self.mlm.create_list("foo@example.com")
 
 408         self.failUnlessEqual(ml.subscribers.groups(), [])
 
 409         ml.subscribers.lock()
 
 410         id = ml.subscribers.add_many(addrs)
 
 411         self.failUnlessEqual(ml.subscribers.groups(), ["0"])
 
 412         self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
 
 413         ml.subscribers.set(id, "status", "bounced")
 
 414         self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
 
 415         subs = ml.subscribers.in_group(id)
 
 417         self.failUnlessEqual(subs, addrs)
 
 419 class ModerationBoxTestCases(ListBase):
 
 421     def testModerationBox(self):
 
 422         ml = self.mlm.create_list("foo@example.com")
 
 423         listdir = os.path.join(DOTDIR, "foo@example.com")
 
 424         boxdir = os.path.join(listdir, "moderation-box")
 
 426         self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
 
 427         self.failUnless(os.path.isdir(boxdir))
 
 429         mailtext = "From: foo\nTo: bar\n\nhello\n"
 
 430         id = ml.moderation_box.add("foo", mailtext)
 
 431         self.failUnless(ml.moderation_box.has(id))
 
 432         self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
 
 433         self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
 
 435         filename = os.path.join(boxdir, id)
 
 436         self.failUnless(os.path.isfile(filename))
 
 437         self.failUnless(os.path.isfile(filename + ".address"))
 
 439         ml.moderation_box.remove(id)
 
 440         self.failIf(ml.moderation_box.has(id))
 
 441         self.failUnless(not os.path.exists(filename))
 
 443 class IncomingBase(unittest.TestCase):
 
 446         if os.path.isdir(DOTDIR):
 
 447             shutil.rmtree(DOTDIR)
 
 448         self.mlm = eoc.MailingListManager(DOTDIR)
 
 450         ml = self.mlm.create_list("foo@EXAMPLE.com")
 
 451         ml.cp.set("list", "owners", "listmaster@example.com")
 
 453         ml.subscribers.lock()
 
 454         ml.subscribers.add("USER1@example.com")
 
 455         ml.subscribers.add("user2@EXAMPLE.com")
 
 456         ml.subscribers.save()
 
 457         self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
 
 458         self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
 
 462         shutil.rmtree(DOTDIR)
 
 464     def write_file_in_listdir(self, ml, basename, contents):
 
 465         f = open(os.path.join(ml.dirname, basename), "w")
 
 469     def configure_list(self, subscription, posting):
 
 470         list = self.mlm.open_list("foo@example.com")
 
 471         list.cp.set("list", "subscription", subscription)
 
 472         list.cp.set("list", "posting", posting)
 
 475     def environ(self, sender, recipient):
 
 478             "RECIPIENT": recipient,
 
 481     def catch_sendmail(self, sender, recipients, text):
 
 482         self.sent_mail.append({
 
 484             "recipients": recipients,
 
 488     def send(self, sender, recipient, text="", force_moderation=0, 
 
 490         self.environ(sender, recipient)
 
 491         dict = self.mlm.parse_recipient_address(recipient, None, None)
 
 492         dict["force-moderation"] = force_moderation
 
 493         dict["force-posting"] = force_posting
 
 494         self.ml = self.mlm.open_list(dict["name"])
 
 495         if "\n\n" not in text:
 
 497         text = "Received: foobar\n" + text
 
 498         self.ml.read_stdin = lambda t=text: t
 
 499         self.mlm.send_mail = self.catch_sendmail
 
 503     def sender_matches(self, mail, sender):
 
 504         pat = "(?P<address>" + sender + ")"
 
 505         m = re.match(pat, mail["sender"], re.I)
 
 507             return m.group("address")
 
 511     def replyto_matches(self, mail, replyto):
 
 512         pat = "(.|\n)*(?P<address>" + replyto + ")"
 
 513         m = re.match(pat, mail["text"], re.I)
 
 515             return m.group("address")
 
 519     def receiver_matches(self, mail, recipient):
 
 520         return map(string.lower, mail["recipients"]) == [recipient.lower()]
 
 522     def body_matches(self, mail, body):
 
 524             pat = re.compile("(.|\n)*" + body + "(.|\n)*")
 
 525             m = re.match(pat, mail["text"])
 
 530     def headers_match(self, mail, header):
 
 532             pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
 
 533             m = re.match(pat, mail["text"])
 
 538     def match(self, sender, replyto, receiver, body=None, header=None,
 
 541         for mail in self.sent_mail:
 
 543                 m1 = self.sender_matches(mail, sender)
 
 544                 m3 = self.receiver_matches(mail, receiver)
 
 545                 m4 = self.body_matches(mail, body)
 
 546                 m5 = self.headers_match(mail, header)
 
 547                 m6 = self.headers_match(mail, anti_header)
 
 548                 no_anti_header = anti_header == None or m6 == None
 
 549                 if m1 != None and m3 and m4 and m5 and no_anti_header:
 
 551                     self.sent_mail.remove(mail)
 
 554                 m1 = self.sender_matches(mail, sender)
 
 555                 m2 = self.replyto_matches(mail, replyto)
 
 556                 m3 = self.receiver_matches(mail, receiver)
 
 557                 m4 = self.body_matches(mail, body)
 
 558                 m5 = self.headers_match(mail, header)
 
 559                 m6 = self.headers_match(mail, anti_header)
 
 560                 no_anti_header = anti_header == None or m6 == None
 
 561                 if m1 != None and m2 != None and m3 and m4 and m5 and \
 
 564                     self.sent_mail.remove(mail)
 
 566         self.failUnless(ret != None)
 
 569     def no_more_mail(self):
 
 570         self.failUnlessEqual(self.sent_mail, [])
 
 573 class SimpleCommandAddressTestCases(IncomingBase):
 
 576         self.send("outsider@example.com", "foo-help@example.com")
 
 577         self.match("foo-ignore@example.com", None, "outsider@example.com", 
 
 582         self.send("outsider@example.com", "foo-owner@example.com", "abcde")
 
 583         self.match("outsider@example.com", None, "listmaster@example.com",
 
 587     def testIgnore(self):
 
 588         self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
 
 591 class OwnerCommandTestCases(IncomingBase):
 
 594         self.send("listmaster@example.com", "foo-list@example.com")
 
 595         self.match("foo-ignore@example.com", None, "listmaster@example.com",
 
 596                    "[uU][sS][eE][rR][12]@" +
 
 597                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
 
 598                    "[uU][sS][eE][rR][12]@" +
 
 599                         "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
 
 602     def testListDenied(self):
 
 603         self.send("outsider@example.com", "foo-list@example.com")
 
 604         self.match("foo-ignore@example.com", None, "outsider@example.com", 
 
 605                    "Subject: Subscriber list denied")
 
 608     def testSetlist(self):
 
 609         self.send("listmaster@example.com", "foo-setlist@example.com",
 
 610                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
 
 611         a = self.match("foo-ignore@example.com", 
 
 612                        "foo-setlistyes-[^@]*@example.com", 
 
 613                        "listmaster@example.com", 
 
 614                        "Subject: Please moderate subscriber list")
 
 617         self.send("listmaster@example.com", a)
 
 618         self.match("foo-ignore@example.com", None, "listmaster@example.com",
 
 619                    "Subject: Subscriber list has been changed")
 
 620         self.match("foo-ignore@example.com", None, "new1@example.com",
 
 621                    "Subject: Welcome to")
 
 622         self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
 
 623                    "Subject: Goodbye from")
 
 626     def testSetlistSilently(self):
 
 627         self.send("listmaster@example.com", "foo-setlistsilently@example.com",
 
 628                   "From: foo\n\nnew1@example.com\nuser1@example.com\n")
 
 629         a = self.match("foo-ignore@example.com", 
 
 630                        "foo-setlistsilentyes-[^@]*@example.com", 
 
 631                        "listmaster@example.com", 
 
 632                        "Subject: Please moderate subscriber list")
 
 635         self.send("listmaster@example.com", a)
 
 636         self.match("foo-ignore@example.com", None, "listmaster@example.com",
 
 637                    "Subject: Subscriber list has been changed")
 
 640     def testSetlistDenied(self):
 
 641         self.send("outsider@example.com", "foo-setlist@example.com",
 
 642                   "From: foo\n\nnew1@example.com\nnew2@example.com\n")
 
 643         self.match("foo-ignore@example.com", 
 
 645                    "outsider@example.com", 
 
 646                    "Subject: You can't set the subscriber list")
 
 649     def testSetlistBadlist(self):
 
 650         self.send("listmaster@example.com", "foo-setlist@example.com",
 
 651                   "From: foo\n\nBlah blah blah.\n")
 
 652         self.match("foo-ignore@example.com", 
 
 654                    "listmaster@example.com", 
 
 655                    "Subject: Bad address list")
 
 658     def testOwnerSubscribesSomeoneElse(self):
 
 659         # Send subscription request. List sends confirmation request.
 
 660         self.send("listmaster@example.com",
 
 661                   "foo-subscribe-outsider=example.com@example.com")
 
 662         a = self.match("foo-ignore@example.com", 
 
 663                        "foo-subyes-[^@]*@example.com", 
 
 664                        "listmaster@example.com",
 
 665                        "Please confirm subscription")
 
 668         # Confirm sub. req. List sends welcome.
 
 669         self.send("listmaster@example.com", a)
 
 670         self.match("foo-ignore@example.com", 
 
 672                    "outsider@example.com", 
 
 676     def testOwnerUnubscribesSomeoneElse(self):
 
 677         # Send unsubscription request. List sends confirmation request.
 
 678         self.send("listmaster@example.com",
 
 679                   "foo-unsubscribe-outsider=example.com@example.com")
 
 680         a = self.match("foo-ignore@example.com", 
 
 681                        "foo-unsubyes-[^@]*@example.com", 
 
 682                        "listmaster@example.com",
 
 683                        "Subject: Please confirm UNsubscription")
 
 686         # Confirm sub. req. List sends welcome.
 
 687         self.send("listmaster@example.com", a)
 
 688         self.match("foo-ignore@example.com", None, "outsider@example.com", 
 
 692 class SubscriptionTestCases(IncomingBase):
 
 694     def confirm(self, recipient):
 
 695         # List has sent confirmation request. Respond to it.
 
 696         a = self.match("foo-ignore@example.com", 
 
 697                        "foo-subyes-[^@]*@example.com", 
 
 699                        "Please confirm subscription")
 
 702         # Confirm sub. req. List response will be analyzed later.
 
 703         self.send("something.random@example.com", a)
 
 705     def got_welcome(self, recipient):
 
 706         self.match("foo-ignore@example.com", 
 
 712     def approve(self, user_recipient):
 
 713         self.match("foo-ignore@example.com", None, user_recipient)
 
 714         a = self.match("foo-ignore@example.com", 
 
 715                        "foo-subapprove-[^@]*@example.com",
 
 716                        "listmaster@example.com")
 
 717         self.send("listmaster@example.com", a)
 
 719     def reject(self, user_recipient):
 
 720         self.match("foo-ignore@example.com", None, user_recipient)
 
 721         a = self.match("foo-ignore@example.com", 
 
 722                        "foo-subreject-[^@]*@example.com",
 
 723                        "listmaster@example.com")
 
 724         self.send("listmaster@example.com", a)
 
 726     def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
 
 727         self.configure_list("free", "free")
 
 728         self.send("outsider@example.com", "foo-subscribe@example.com")
 
 729         self.confirm("outsider@example.com")
 
 730         self.got_welcome("outsider@example.com")
 
 732     def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
 
 733         self.configure_list("free", "free")
 
 734         self.send("user1@example.com", "foo-subscribe@example.com")
 
 735         self.confirm("user1@example.com")
 
 736         self.got_welcome("user1@example.com")
 
 738     def testSubscribeToUnmoderatedWithAddressNotOnList(self):
 
 739         self.configure_list("free", "free")
 
 740         self.send("somebody.else@example.com", 
 
 741                   "foo-subscribe-outsider=example.com@example.com")
 
 742         self.confirm("outsider@example.com")
 
 743         self.got_welcome("outsider@example.com")
 
 745     def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
 
 746         self.configure_list("free", "free")
 
 747         self.send("somebody.else@example.com", 
 
 748                   "foo-subscribe-user1=example.com@example.com")
 
 749         self.confirm("user1@example.com")
 
 750         self.got_welcome("user1@example.com")
 
 752     def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
 
 753         self.configure_list("moderated", "moderated")
 
 754         self.send("outsider@example.com", "foo-subscribe@example.com")
 
 755         self.confirm("outsider@example.com")
 
 756         self.approve("outsider@example.com")
 
 757         self.got_welcome("outsider@example.com")
 
 759     def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
 
 760         self.configure_list("moderated", "moderated")
 
 761         self.send("outsider@example.com", "foo-subscribe@example.com")
 
 762         self.confirm("outsider@example.com")
 
 763         self.reject("outsider@example.com")
 
 765     def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
 
 766         self.configure_list("moderated", "moderated")
 
 767         self.send("user1@example.com", "foo-subscribe@example.com")
 
 768         self.confirm("user1@example.com")
 
 769         self.approve("user1@example.com")
 
 770         self.got_welcome("user1@example.com")
 
 772     def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
 
 773         self.configure_list("moderated", "moderated")
 
 774         self.send("user1@example.com", "foo-subscribe@example.com")
 
 775         self.confirm("user1@example.com")
 
 776         self.reject("user1@example.com")
 
 778     def testSubscribeToModeratedWithAddressNotOnListApproved(self):
 
 779         self.configure_list("moderated", "moderated")
 
 780         self.send("somebody.else@example.com", 
 
 781                   "foo-subscribe-outsider=example.com@example.com")
 
 782         self.confirm("outsider@example.com")
 
 783         self.approve("outsider@example.com")
 
 784         self.got_welcome("outsider@example.com")
 
 786     def testSubscribeToModeratedWithAddressNotOnListRejected(self):
 
 787         self.configure_list("moderated", "moderated")
 
 788         self.send("somebody.else@example.com", 
 
 789                   "foo-subscribe-outsider=example.com@example.com")
 
 790         self.confirm("outsider@example.com")
 
 791         self.reject("outsider@example.com")
 
 793     def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
 
 794         self.configure_list("moderated", "moderated")
 
 795         self.send("somebody.else@example.com", 
 
 796                   "foo-subscribe-user1=example.com@example.com")
 
 797         self.confirm("user1@example.com")
 
 798         self.approve("user1@example.com")
 
 799         self.got_welcome("user1@example.com")
 
 801     def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
 
 802         self.configure_list("moderated", "moderated")
 
 803         self.send("somebody.else@example.com", 
 
 804                   "foo-subscribe-user1=example.com@example.com")
 
 805         self.confirm("user1@example.com")
 
 806         self.reject("user1@example.com")
 
 808 class UnsubscriptionTestCases(IncomingBase):
 
 810     def confirm(self, recipient):
 
 811         # List has sent confirmation request. Respond to it.
 
 812         a = self.match("foo-ignore@example.com", 
 
 813                        "foo-unsubyes-[^@]*@example.com", 
 
 815                        "Please confirm UNsubscription")
 
 818         # Confirm sub. req. List response will be analyzed later.
 
 819         self.send("something.random@example.com", a)
 
 821     def got_goodbye(self, recipient):
 
 822         self.match("foo-ignore@example.com", 
 
 828     def testUnsubscribeWithoutAddressNotOnList(self):
 
 829         self.send("outsider@example.com", "foo-unsubscribe@example.com")
 
 830         self.confirm("outsider@example.com")
 
 831         self.got_goodbye("outsider@example.com")
 
 833     def testUnsubscribeWithoutAddressOnList(self):
 
 834         self.send("user1@example.com", "foo-unsubscribe@example.com")
 
 835         self.confirm("user1@example.com")
 
 836         self.got_goodbye("user1@example.com")
 
 838     def testUnsubscribeWithAddressNotOnList(self):
 
 839         self.send("somebody.else@example.com", 
 
 840                   "foo-unsubscribe-outsider=example.com@example.com")
 
 841         self.confirm("outsider@example.com")
 
 842         self.got_goodbye("outsider@example.com")
 
 844     def testUnsubscribeWithAddressOnList(self):
 
 845         self.send("somebody.else@example.com", 
 
 846                   "foo-unsubscribe-user1=example.com@example.com")
 
 847         self.confirm("user1@example.com")
 
 848         self.got_goodbye("user1@example.com")
 
 850 class PostTestCases(IncomingBase):
 
 852     msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")
 
 854     def approve(self, user_recipient):
 
 855         self.match("foo-ignore@example.com", None, user_recipient)
 
 856         a = self.match("foo-ignore@example.com", 
 
 857                        "foo-approve-[^@]*@example.com",
 
 858                        "listmaster@example.com")
 
 859         self.send("listmaster@example.com", a)
 
 861     def reject(self, user_recipient):
 
 862         self.match("foo-ignore@example.com", None, user_recipient)
 
 863         a = self.match("foo-ignore@example.com", 
 
 864                        "foo-reject-[^@]*@example.com",
 
 865                        "listmaster@example.com")
 
 866         self.send("listmaster@example.com", a)
 
 868     def check_headers_are_encoded(self):
 
 870         for code in range(32, 127):
 
 871             ok_chars = ok_chars + chr(code)
 
 872         for mail in self.sent_mail:
 
 874             self.failUnless("\n\n" in text)
 
 875             headers = text.split("\n\n")[0]
 
 877                 if c not in ok_chars: print headers
 
 878                 self.failUnless(c in ok_chars)
 
 880     def check_mail_to_list(self):
 
 881         self.check_headers_are_encoded()
 
 882         self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
 
 885                    anti_header="Received:")
 
 886         self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
 
 889                    anti_header="Received:")
 
 892     def check_that_moderation_box_is_empty(self):
 
 893         ml = self.mlm.open_list("foo@example.com")
 
 894         self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])
 
 896     def testSubscriberPostsToUnmoderated(self):
 
 897         self.configure_list("free", "free")
 
 898         self.send("user1@example.com", "foo@example.com", 
 
 900         self.check_mail_to_list()
 
 902     def testOutsiderPostsToUnmoderated(self):
 
 903         self.configure_list("free", "free")
 
 904         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 905         self.check_mail_to_list()
 
 907     def testSubscriberPostToAutomoderated(self):
 
 908         self.configure_list("free", "auto")
 
 909         self.check_that_moderation_box_is_empty()
 
 910         self.send("user1@example.com", "foo@example.com", self.msg)
 
 911         self.check_mail_to_list()
 
 912         self.check_that_moderation_box_is_empty()
 
 914     def testOutsiderPostsToAutomoderatedRejected(self):
 
 915         self.configure_list("free", "auto")
 
 916         self.check_that_moderation_box_is_empty()
 
 917         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 918         self.reject("outsider@example.com")
 
 919         self.check_that_moderation_box_is_empty()
 
 921     def testOutsiderPostsToAutomoderatedApproved(self):
 
 922         self.configure_list("free", "auto")
 
 923         self.check_that_moderation_box_is_empty()
 
 924         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 925         self.approve("outsider@example.com")
 
 926         self.check_mail_to_list()
 
 927         self.check_that_moderation_box_is_empty()
 
 929     def testSubscriberPostsToModeratedRejected(self):
 
 930         self.configure_list("free", "moderated")
 
 931         self.check_that_moderation_box_is_empty()
 
 932         self.send("user1@example.com", "foo@example.com", self.msg)
 
 933         self.reject("user1@example.com")
 
 934         self.check_that_moderation_box_is_empty()
 
 936     def testOutsiderPostsToMderatedApproved(self):
 
 937         self.configure_list("free", "moderated")
 
 938         self.check_that_moderation_box_is_empty()
 
 939         self.send("outsider@example.com", "foo@example.com", self.msg)
 
 940         self.approve("outsider@example.com")
 
 941         self.check_mail_to_list()
 
 942         self.check_that_moderation_box_is_empty()
 
 944     def testSubscriberPostsWithRequestToBeModerated(self):
 
 945         self.configure_list("free", "free")
 
 947         self.check_that_moderation_box_is_empty()
 
 948         self.send("user1@example.com", "foo@example.com", self.msg,
 
 950         self.match("foo-ignore@example.com", 
 
 953                    "Subject: Please wait")
 
 954         a = self.match("foo-ignore@example.com", 
 
 955                        "foo-approve-[^@]*@example.com", 
 
 956                        "listmaster@example.com")
 
 959         self.send("listmaster@example.com", a)
 
 960         self.check_mail_to_list()
 
 961         self.check_that_moderation_box_is_empty()
 
 963     def testSubscriberPostsWithModerationOverride(self):
 
 964         self.configure_list("moderated", "moderated")
 
 965         self.send("user1@example.com", "foo@example.com", self.msg,
 
 967         self.check_mail_to_list()
 
 968         self.check_that_moderation_box_is_empty()
 
 970 class BounceTestCases(IncomingBase):
 
 972     def check_subscriber_status(self, must_be):
 
 973         ml = self.mlm.open_list("foo@example.com")
 
 974         for id in ml.subscribers.groups():
 
 975             self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)
 
 977     def bounce_sent_mail(self):
 
 978         for m in self.sent_mail[:]:
 
 979             self.send("something@example.com", m["sender"], "eek")
 
 980             self.failUnlessEqual(len(self.sent_mail), 0)
 
 982     def send_mail_to_list_then_bounce_everything(self):
 
 983         self.send("user@example.com", "foo@example.com", "hello")
 
 984         for m in self.sent_mail[:]:
 
 985             self.send("foo@example.com", m["sender"], "eek")
 
 986             self.failUnlessEqual(len(self.sent_mail), 0)
 
 988     def testBounceOnceThenRecover(self):
 
 989         self.check_subscriber_status("ok")
 
 990         self.send_mail_to_list_then_bounce_everything()
 
 992         self.check_subscriber_status("bounced")
 
 994         ml = self.mlm.open_list("foo@example.com")
 
 995         for id in ml.subscribers.groups():
 
 996             bounce_id = ml.subscribers.get(id, "bounce-id")
 
 997             self.failUnless(bounce_id)
 
 998             self.failUnless(ml.bounce_box.has(bounce_id))
 
1002         ml = self.mlm.open_list("foo@example.com")
 
1003         ml.subscribers.lock()
 
1004         for id in ml.subscribers.groups():
 
1005             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
 
1006             self.failUnless(abs(timestamp - now) < 10.0)
 
1007             ml.subscribers.set(id, "timestamp-bounced", "69.0")
 
1008             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
 
1009         ml.subscribers.save()
 
1011         self.mlm.cleaning_woman(no_op)
 
1012         self.check_subscriber_status("probed")
 
1014         for bounce_id in bounce_ids:
 
1015             self.failUnless(ml.bounce_box.has(bounce_id))
 
1017         self.mlm.cleaning_woman(no_op)
 
1018         ml = self.mlm.open_list("foo@example.com")
 
1019         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1020         self.check_subscriber_status("ok")
 
1021         for bounce_id in bounce_ids:
 
1022             self.failUnless(not ml.bounce_box.has(bounce_id))
 
1024     def testBounceProbeAlso(self):
 
1025         self.check_subscriber_status("ok")
 
1026         self.send_mail_to_list_then_bounce_everything()
 
1027         self.check_subscriber_status("bounced")
 
1029         ml = self.mlm.open_list("foo@example.com")
 
1030         for id in ml.subscribers.groups():
 
1031             bounce_id = ml.subscribers.get(id, "bounce-id")
 
1032             self.failUnless(bounce_id)
 
1033             self.failUnless(ml.bounce_box.has(bounce_id))
 
1037         ml = self.mlm.open_list("foo@example.com")
 
1038         ml.subscribers.lock()
 
1039         for id in ml.subscribers.groups():
 
1040             timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
 
1041             self.failUnless(abs(timestamp - now) < 10.0)
 
1042             ml.subscribers.set(id, "timestamp-bounced", "69.0")
 
1043             bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
 
1044         ml.subscribers.save()
 
1047         self.mlm.cleaning_woman(self.catch_sendmail)
 
1048         self.check_subscriber_status("probed")
 
1049         for bounce_id in bounce_ids:
 
1050             self.failUnless(ml.bounce_box.has(bounce_id))
 
1051         self.bounce_sent_mail()
 
1052         self.check_subscriber_status("probebounced")
 
1054         self.mlm.cleaning_woman(no_op)
 
1055         ml = self.mlm.open_list("foo@example.com")
 
1056         self.failUnlessEqual(len(ml.subscribers.groups()), 0)
 
1057         for bounce_id in bounce_ids:
 
1058             self.failUnless(not ml.bounce_box.has(bounce_id))
 
1060     def testCleaningWomanJoinsAndBounceSplitsGroups(self):
 
1061         # Check that each group contains one address and set the creation
 
1062         # timestamp to an ancient time.
 
1063         ml = self.mlm.open_list("foo@example.com")
 
1064         bouncedir = os.path.join(ml.dirname, "bounce-box")
 
1065         ml.subscribers.lock()
 
1066         for id in ml.subscribers.groups():
 
1067             addrs = ml.subscribers.in_group(id)
 
1068             self.failUnlessEqual(len(addrs), 1)
 
1069             bounce_id = ml.subscribers.get(id, "bounce-id")
 
1070             self.failUnlessEqual(bounce_id, "..notexist..")
 
1071             bounce_id = "bounce-" + id
 
1072             ml.subscribers.set(id, "bounce-id", bounce_id)
 
1073             bounce_path = os.path.join(bouncedir, bounce_id)
 
1074             self.failUnless(not os.path.isfile(bounce_path))
 
1075             f = open(bounce_path, "w")
 
1077             f = open(bounce_path + ".address", "w")
 
1079             self.failUnless(os.path.isfile(bounce_path))
 
1080             ml.subscribers.set(id, "timestamp-created", "1")
 
1081         ml.subscribers.save()
 
1083         # Check that --cleaning-woman joins the two groups into one.
 
1084         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1085         self.mlm.cleaning_woman(no_op)
 
1086         ml = self.mlm.open_list("foo@example.com")
 
1087         self.failUnlessEqual(len(ml.subscribers.groups()), 1)
 
1088         self.failUnlessEqual(os.listdir(bouncedir), [])
 
1090         # Check that a bounce splits the single group.
 
1091         self.send_mail_to_list_then_bounce_everything()
 
1092         ml = self.mlm.open_list("foo@example.com")
 
1093         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1095         # Check that a --cleaning-woman immediately after doesn't join.
 
1096         # (The groups are new, thus shouldn't be joined for a week.)
 
1097         self.failUnlessEqual(len(ml.subscribers.groups()), 2)
 
1098         self.mlm.cleaning_woman(no_op)
 
1099         ml = self.mlm.open_list("foo@example.com")
 
1100         self.failUnlessEqual(len(ml.subscribers.groups()), 2)