import unittest
import shutil
import os
import re
import time
import string

import eoc

DOTDIR = "dot-dir-for-testing"
eoc.DOTDIR = DOTDIR
eoc.quiet = 1

def no_op(*args):
    pass

class AddressCleaningTestCases(unittest.TestCase):

    def setUp(self):
        self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
                                     "bar@lists.example.com"])
    
    def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
        self.ap.set_skip_prefix(skip_prefix)
        self.ap.set_forced_domain(forced_domain)
        address = self.ap.clean(address)
        self.failUnlessEqual(address, wanted)
    
    def testSimpleAddress(self):
        self.verify("foo@example.com", "foo@example.com")

    def testUpperCaseAddress(self):
        self.verify("FOO@EXAMPLE.COM", "foo@example.com")

    def testPrefixRemoval(self):
        self.verify("foo@example.com", "foo@example.com", 
                    skip_prefix="user-")
        self.verify("user-foo@example.com", "foo@example.com", 
                    skip_prefix="user-")

    def testForcedDomain(self):
        self.verify("foo@example.com", "foo@example.com",
                    forced_domain="example.com")
        self.verify("foo@whatever.example.com", "foo@example.com", 
                    forced_domain="example.com")

    def testPrefixRemovalWithForcedDomain(self):
        self.verify("foo@example.com", "foo@example.com", 
                    skip_prefix="user-",
                    forced_domain="example.com")
        self.verify("foo@whatever.example.com", "foo@example.com", 
                    skip_prefix="user-",
                    forced_domain="example.com")
        self.verify("user-foo@example.com", "foo@example.com", 
                    skip_prefix="user-",
                    forced_domain="example.com")
        self.verify("user-foo@whatever.example.com", "foo@example.com", 
                    skip_prefix="user-",
                    forced_domain="example.com")

class AddressParserTestCases(unittest.TestCase):

    def setUp(self):
        self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
                                     "bar@lists.example.com"])
    
    def verify_parser(self, address, wanted_listname, wanted_parts):
        listname, parts = self.ap.parse(address)
        self.failUnlessEqual(listname, wanted_listname)
        self.failUnlessEqual(parts, wanted_parts)
        
    def testParser(self):
        self.verify_parser("foo@example.com", 
                           "foo@EXAMPLE.com", 
                           [])
        self.verify_parser("foo-subscribe@example.com", 
                           "foo@EXAMPLE.com", 
                           ["subscribe"])
        self.verify_parser("foo-subscribe-joe=example.com@example.com", 
                           "foo@EXAMPLE.com", 
                           ["subscribe", "joe=example.com"])
        self.verify_parser("foo-bounce-123-ABCDEF@example.com", 
                           "foo@EXAMPLE.com", 
                           ["bounce", "123", "abcdef"])

class ParseRecipientAddressBase(unittest.TestCase):

    def setUp(self):
        self.lists = ["foo@example.com", 
                      "bar@lists.example.com",
                      "foo-announce@example.com"]
        self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
    
    def environ(self, sender, recipient):
        eoc.set_environ({
            "SENDER": sender,
            "RECIPIENT": recipient,
        })

class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):

    def testEmpty(self):
        self.failUnlessRaises(eoc.UnknownList,
                              self.mlm.parse_recipient_address, 
                              "", None, None)

    def verify(self, address, skip_prefix, forced_domain, wanted_dict):
        dict = self.mlm.parse_recipient_address(address, skip_prefix, 
                                                forced_domain)
        self.failUnlessEqual(dict, wanted_dict)

    def testSimpleAddresses(self):
        self.verify("foo@example.com", 
                    None, 
                    None, 
                    { "name": "foo@example.com", "command": "post" })
        self.verify("FOO@EXAMPLE.COM", 
                    None, 
                    None, 
                    { "name": "foo@example.com", "command": "post" })
        self.verify("prefix-foo@example.com", 
                    "prefix-", 
                    None, 
                    { "name": "foo@example.com", "command": "post" })
        self.verify("bar@example.com", 
                    None, 
                    "lists.example.com", 
                    { "name": "bar@lists.example.com", "command": "post" })
        self.verify("prefix-bar@example.com", 
                    "prefix-",
                    "lists.example.com", 
                    { "name": "bar@lists.example.com", "command": "post" })

    def testSubscription(self):
        self.verify("foo-subscribe@example.com", 
                    None,
                    None,
                    { "name": "foo@example.com", 
                      "command": "subscribe",
                      "sender": "",
                    })
        self.verify("foo-subscribe-joe-user=example.com@example.com", 
                    None,
                    None,
                    { "name": "foo@example.com", 
                      "command": "subscribe",
                      "sender": "joe-user@example.com",
                    })
        self.verify("foo-unsubscribe@example.com", 
                    None,
                    None,
                    { "name": "foo@example.com", 
                      "command": "unsubscribe",
                      "sender": "",
                    })
        self.verify("foo-unsubscribe-joe-user=example.com@example.com", 
                    None,
                    None,
                    { "name": "foo@example.com", 
                      "command": "unsubscribe",
                      "sender": "joe-user@example.com",
                    })

    def testPost(self):
        for name in self.lists:
            self.verify(name, None, None, { "name": name, "command": "post" })

    def testSimpleCommands(self):
        for name in self.lists:
            for command in ["help", "list", "owner"]:
                localpart, domain = name.split("@")
                address = "%s-%s@%s" % (localpart, command, domain)
                self.verify(address, None, None,
                            { "name": name,
                              "command": command
                            })

class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):

    def try_good_signature(self, command):
        s = "foo-announce-%s-1" % command
        hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
        local_part = "%s-%s" % (s, hash)
        dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
                                                None, None)
        self.failUnlessEqual(dict,
                             {
                                "name": "foo-announce@example.com",
                                "command": command,
                                "id": "1",
                             })

    def testProperlySignedCommands(self):
        self.try_good_signature("subyes")
        self.try_good_signature("subapprove")
        self.try_good_signature("subreject")
        self.try_good_signature("unsubyes")
        self.try_good_signature("bounce")
        self.try_good_signature("approve")
        self.try_good_signature("reject")
        self.try_good_signature("probe")

class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):

    def try_bad_signature(self, command_part):
        self.failUnlessRaises(eoc.BadSignature,
                              self.mlm.parse_recipient_address, 
                              "foo-announce-" + command_part + 
                                    "-123-badhash@example.com",
                              None, None)

    def testBadlySignedCommands(self):
        self.try_bad_signature("subyes")
        self.try_bad_signature("subapprove")
        self.try_bad_signature("subreject")
        self.try_bad_signature("unsubyes")
        self.try_bad_signature("bounce")
        self.try_bad_signature("approve")
        self.try_bad_signature("reject")
        self.try_bad_signature("probe")

class DotDirTestCases(unittest.TestCase):

    def setUp(self):
        self.secret_name = os.path.join(DOTDIR, "secret")

    def tearDown(self):
        shutil.rmtree(DOTDIR)

    def dotdir_is_ok(self):
        self.failUnless(os.path.isdir(DOTDIR))
        self.failUnless(os.path.isfile(self.secret_name))

    def testNoDotDirExists(self):
        self.failIf(os.path.exists(DOTDIR))
        mlm = eoc.MailingListManager(DOTDIR)
        self.dotdir_is_ok()

    def testDotDirDoesExistButSecretDoesNot(self):
        self.failIf(os.path.exists(DOTDIR))
        os.makedirs(DOTDIR)
        self.failUnless(os.path.isdir(DOTDIR))
        self.failIf(os.path.exists(self.secret_name))
        mlm = eoc.MailingListManager(DOTDIR)
        self.dotdir_is_ok()


class RemoveSomeHeadersTest(unittest.TestCase):

    def testRemoveSomeHeaders(self):
        mlm = eoc.MailingListManager(DOTDIR)
        ml = eoc.MailingList(mlm, "list@example.com")
        mail = """\
Header-1: this is a simple header
Header-2: this
    is
    a
    complex header with a colon: yes it is
Header-3: odd numbered headers are simple

Body.
"""
        mail2 = ml.remove_some_headers(mail, ["Header-2"])
        self.failUnlessEqual(mail2, """\
Header-1: this is a simple header
Header-3: odd numbered headers are simple

Body.
""")

class ListBase(unittest.TestCase):

    def setUp(self):
        if os.path.exists(DOTDIR):
            shutil.rmtree(DOTDIR)
        self.mlm = eoc.MailingListManager(DOTDIR)

    def tearDown(self):
        self.mlm = None
        shutil.rmtree(DOTDIR)

class ListCreationTestCases(ListBase):

    def setUp(self):
        ListBase.setUp(self)
        self.names = None

    def listdir(self, listname):
        return os.path.join(DOTDIR, listname)

    def listdir_has_file(self, listdir, filename):
        self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
        self.names.remove(filename)

    def listdir_has_dir(self, listdir, dirname):
        self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
        self.names.remove(dirname)

    def listdir_may_have_dir(self, listdir, dirname):
        if dirname in self.names:
            self.listdir_has_dir(listdir, dirname)

    def listdir_is_ok(self, listname):
        listdir = self.listdir(listname)
        self.failUnless(os.path.isdir(listdir))
        self.names = os.listdir(listdir)
        
        self.listdir_has_file(listdir, "config")
        self.listdir_has_file(listdir, "subscribers")
            
        self.listdir_has_dir(listdir, "bounce-box")
        self.listdir_has_dir(listdir, "subscription-box")
            
        self.listdir_may_have_dir(listdir, "moderation-box")
        self.listdir_may_have_dir(listdir, "templates")
            
        # Make sure there are no extras.
        self.failUnlessEqual(self.names, [])

    def testCreateNew(self):
        self.failIf(os.path.exists(self.listdir("foo@example.com")))
        ml = self.mlm.create_list("foo@example.com")

        self.failUnlessEqual(ml.__class__, eoc.MailingList)
        self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))

        self.listdir_is_ok("foo@example.com")

    def testCreateExisting(self):
        list = self.mlm.create_list("foo@example.com")
        self.failUnlessRaises(eoc.ListExists,
                              self.mlm.create_list, "foo@example.com")
        self.listdir_is_ok("foo@example.com")

class ListOptionTestCases(ListBase):

    def check(self, ml, wanted):
        self.failUnlessEqual(ml.cp.sections(), ["list"])
        cpdict = {}
        for key, value in ml.cp.items("list"):
            cpdict[key] = value
        self.failUnlessEqual(cpdict, wanted)

    def testDefaultOptionsOnCreateAndOpenExisting(self):
        self.mlm.create_list("foo@example.com")
        ml = self.mlm.open_list("foo@example.com")
        self.check(ml,
                   {
                      "owners": "",
                      "moderators": "",
                      "subscription": "free",
                      "posting": "free",
                      "archived": "no",
                      "mail-on-subscription-changes": "no",
                      "mail-on-forced-unsubscribe": "no",
                      "ignore-bounce": "no",
                      "language": "",
                      "pristine-headers": "",
                      "subject-prefix": "",
                   })

    def testChangeOptions(self):
        # Create a list, change some options, and save the result.
        ml = self.mlm.create_list("foo@example.com")
        self.failUnlessEqual(ml.cp.get("list", "owners"), "")
        self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
        ml.cp.set("list", "owners", "owner@example.com")
        ml.cp.set("list", "posting", "moderated")
        ml.save_config()
        
        # Re-open the list and check that the new instance has read the
        # values from the disk correctly.
        ml2 = self.mlm.open_list("foo@example.com")
        self.check(ml2,
                   {
                      "owners": "owner@example.com",
                      "moderators": "",
                      "subscription": "free",
                      "posting": "moderated",
                      "archived": "no",
                      "mail-on-subscription-changes": "no",
                      "mail-on-forced-unsubscribe": "no",
                      "ignore-bounce": "no",
                      "language": "",
                      "pristine-headers": "",
                      "subject-prefix": "",
                   })

class SubscriberDatabaseTestCases(ListBase):

    def has_subscribers(self, ml, addrs):
        subs = ml.subscribers.get_all()
        subs.sort()
        self.failUnlessEqual(subs, addrs)

    def testAddAndRemoveSubscribers(self):
        addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
        addrs.sort()
    
        ml = self.mlm.create_list("foo@example.com")
        self.failUnlessEqual(ml.subscribers.get_all(), [])

        self.failUnless(ml.subscribers.lock())
        ml.subscribers.add_many(addrs)
        self.has_subscribers(ml, addrs)
        ml.subscribers.save()
        self.failIf(ml.subscribers.locked)
        ml = None

        ml2 = self.mlm.open_list("foo@example.com")
        self.has_subscribers(ml2, addrs)
        ml2.subscribers.lock()
        ml2.subscribers.remove(addrs[0])
        self.has_subscribers(ml2, addrs[1:])
        
        ml2.subscribers.save()
        
        ml3 = self.mlm.open_list("foo@example.com")
        self.has_subscribers(ml3, addrs[1:])

    def testSubscribeTwice(self):
        ml = self.mlm.create_list("foo@example.com")
        self.failUnlessEqual(ml.subscribers.get_all(), [])
        ml.subscribers.lock()
        ml.subscribers.add("user@example.com")
        ml.subscribers.add("USER@example.com")
        self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
                             map(string.lower, ["USER@example.com"]))

    def testSubscriberAttributesAndGroups(self):
        addrs = ["joe@example.com", "mary@example.com"]
        addrs.sort()
        ml = self.mlm.create_list("foo@example.com")
        self.failUnlessEqual(ml.subscribers.groups(), [])
        ml.subscribers.lock()
        id = ml.subscribers.add_many(addrs)
        self.failUnlessEqual(ml.subscribers.groups(), ["0"])
        self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
        ml.subscribers.set(id, "status", "bounced")
        self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
        subs = ml.subscribers.in_group(id)
        subs.sort()
        self.failUnlessEqual(subs, addrs)

    def testSubjectPrefix(self):
        ml = self.mlm.create_list("prefix@example.com")
        ml.cp.set("list", "subject-prefix", "[test]")
        ml.save_config()

        self.failUnlessEqual(ml.cp.get("list", "subject-prefix"), "[test]")

        mail = """\
To: test@example.com
From: test2@example.com
Subject: testing whether the subject prefix works
Precedence: bulk

Body.
"""
        prefixed_mail = ml.add_subject_prefix(mail)

        self.failUnlessEqual(prefixed_mail, """\
To: test@example.com
From: test2@example.com
Subject: [test] testing whether the subject prefix works
Precedence: bulk

Body.
""")


class ModerationBoxTestCases(ListBase):

    def testModerationBox(self):
        ml = self.mlm.create_list("foo@example.com")
        listdir = os.path.join(DOTDIR, "foo@example.com")
        boxdir = os.path.join(listdir, "moderation-box")

        self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
        self.failUnless(os.path.isdir(boxdir))

        mailtext = "From: foo\nTo: bar\n\nhello\n"
        id = ml.moderation_box.add("foo", mailtext)
        self.failUnless(ml.moderation_box.has(id))
        self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
        self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
        
        filename = os.path.join(boxdir, id)
        self.failUnless(os.path.isfile(filename))
        self.failUnless(os.path.isfile(filename + ".address"))
        
        ml.moderation_box.remove(id)
        self.failIf(ml.moderation_box.has(id))
        self.failUnless(not os.path.exists(filename))

class IncomingBase(unittest.TestCase):

    def setUp(self):
        if os.path.isdir(DOTDIR):
            shutil.rmtree(DOTDIR)
        self.mlm = eoc.MailingListManager(DOTDIR)
        self.ml = None
        ml = self.mlm.create_list("foo@EXAMPLE.com")
        ml.cp.set("list", "owners", "listmaster@example.com")
        ml.save_config()
        ml.subscribers.lock()
        ml.subscribers.add("USER1@example.com")
        ml.subscribers.add("user2@EXAMPLE.com")
        ml.subscribers.save()
        self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
        self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
        self.sent_mail = []

    def tearDown(self):
        shutil.rmtree(DOTDIR)

    def write_file_in_listdir(self, ml, basename, contents):
        f = open(os.path.join(ml.dirname, basename), "w")
        f.write(contents)
        f.close()

    def configure_list(self, subscription, posting):
        list = self.mlm.open_list("foo@example.com")
        list.cp.set("list", "subscription", subscription)
        list.cp.set("list", "posting", posting)
        list.save_config()

    def environ(self, sender, recipient):
        eoc.set_environ({
            "SENDER": sender,
            "RECIPIENT": recipient,
        })

    def catch_sendmail(self, sender, recipients, text):
        self.sent_mail.append({
            "sender": sender,
            "recipients": recipients,
            "text": text,
        })

    def send(self, sender, recipient, text="", force_moderation=0, 
             force_posting=0):
        self.environ(sender, recipient)
        dict = self.mlm.parse_recipient_address(recipient, None, None)
        dict["force-moderation"] = force_moderation
        dict["force-posting"] = force_posting
        self.ml = self.mlm.open_list(dict["name"])
        if "\n\n" not in text:
            text = "\n\n" + text
        text = "Received: foobar\n" + text
        self.ml.read_stdin = lambda t=text: t
        self.mlm.send_mail = self.catch_sendmail
        self.sent_mail = []
        self.ml.obey(dict)

    def sender_matches(self, mail, sender):
        pat = "(?P<address>" + sender + ")"
        m = re.match(pat, mail["sender"], re.I)
        if m:
            return m.group("address")
        else:
            return None
        
    def replyto_matches(self, mail, replyto):
        pat = "(.|\n)*(?P<address>" + replyto + ")"
        m = re.match(pat, mail["text"], re.I)
        if m:
            return m.group("address")
        else:
            return None

    def receiver_matches(self, mail, recipient):
        return map(string.lower, mail["recipients"]) == [recipient.lower()]

    def body_matches(self, mail, body):
        if body:
            pat = re.compile("(.|\n)*" + body + "(.|\n)*")
            m = re.match(pat, mail["text"])
            return m
        else:
            return 1

    def headers_match(self, mail, header):
        if header:
            pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
            m = re.match(pat, mail["text"])
            return m
        else:
            return 1

    def match(self, sender, replyto, receiver, body=None, header=None,
              anti_header=None):
        ret = None
        for mail in self.sent_mail:
            if replyto is None:
                m1 = self.sender_matches(mail, sender)
                m3 = self.receiver_matches(mail, receiver)
                m4 = self.body_matches(mail, body)
                m5 = self.headers_match(mail, header)
                m6 = self.headers_match(mail, anti_header)
                no_anti_header = anti_header == None or m6 == None
                if m1 != None and m3 and m4 and m5 and no_anti_header:
                    ret = m1
                    self.sent_mail.remove(mail)
                    break
            else:
                m1 = self.sender_matches(mail, sender)
                m2 = self.replyto_matches(mail, replyto)
                m3 = self.receiver_matches(mail, receiver)
                m4 = self.body_matches(mail, body)
                m5 = self.headers_match(mail, header)
                m6 = self.headers_match(mail, anti_header)
                no_anti_header = anti_header == None or m6 == None
                if m1 != None and m2 != None and m3 and m4 and m5 and \
                   no_anti_header:
                    ret = m2
                    self.sent_mail.remove(mail)
                    break
        self.failUnless(ret != None)
        return ret

    def no_more_mail(self):
        self.failUnlessEqual(self.sent_mail, [])


class SimpleCommandAddressTestCases(IncomingBase):

    def testHelp(self):
        self.send("outsider@example.com", "foo-help@example.com")
        self.match("foo-ignore@example.com", None, "outsider@example.com", 
                   "Subject: Help for")
        self.no_more_mail()

    def testOwner(self):
        self.send("outsider@example.com", "foo-owner@example.com", "abcde")
        self.match("outsider@example.com", None, "listmaster@example.com",
                   "abcde")
        self.no_more_mail()

    def testIgnore(self):
        self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
        self.no_more_mail()

class OwnerCommandTestCases(IncomingBase):

    def testList(self):
        self.send("listmaster@example.com", "foo-list@example.com")
        self.match("foo-ignore@example.com", None, "listmaster@example.com",
                   "[uU][sS][eE][rR][12]@" +
                        "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
                   "[uU][sS][eE][rR][12]@" +
                        "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
        self.no_more_mail()

    def testListDenied(self):
        self.send("outsider@example.com", "foo-list@example.com")
        self.match("foo-ignore@example.com", None, "outsider@example.com", 
                   "Subject: Subscriber list denied")
        self.no_more_mail()

    def testSetlist(self):
        self.send("listmaster@example.com", "foo-setlist@example.com",
                  "From: foo\n\nnew1@example.com\nuser1@example.com\n")
        a = self.match("foo-ignore@example.com", 
                       "foo-setlistyes-[^@]*@example.com", 
                       "listmaster@example.com", 
                       "Subject: Please moderate subscriber list")
        self.no_more_mail()
        
        self.send("listmaster@example.com", a)
        self.match("foo-ignore@example.com", None, "listmaster@example.com",
                   "Subject: Subscriber list has been changed")
        self.match("foo-ignore@example.com", None, "new1@example.com",
                   "Subject: Welcome to")
        self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
                   "Subject: Goodbye from")
        self.no_more_mail()

    def testSetlistSilently(self):
        self.send("listmaster@example.com", "foo-setlistsilently@example.com",
                  "From: foo\n\nnew1@example.com\nuser1@example.com\n")
        a = self.match("foo-ignore@example.com", 
                       "foo-setlistsilentyes-[^@]*@example.com", 
                       "listmaster@example.com", 
                       "Subject: Please moderate subscriber list")
        self.no_more_mail()
        
        self.send("listmaster@example.com", a)
        self.match("foo-ignore@example.com", None, "listmaster@example.com",
                   "Subject: Subscriber list has been changed")
        self.no_more_mail()

    def testSetlistDenied(self):
        self.send("outsider@example.com", "foo-setlist@example.com",
                  "From: foo\n\nnew1@example.com\nnew2@example.com\n")
        self.match("foo-ignore@example.com", 
                   None,
                   "outsider@example.com", 
                   "Subject: You can't set the subscriber list")
        self.no_more_mail()

    def testSetlistBadlist(self):
        self.send("listmaster@example.com", "foo-setlist@example.com",
                  "From: foo\n\nBlah blah blah.\n")
        self.match("foo-ignore@example.com", 
                   None,
                   "listmaster@example.com", 
                   "Subject: Bad address list")
        self.no_more_mail()

    def testOwnerSubscribesSomeoneElse(self):
        # Send subscription request. List sends confirmation request.
        self.send("listmaster@example.com",
                  "foo-subscribe-outsider=example.com@example.com")
        a = self.match("foo-ignore@example.com", 
                       "foo-subyes-[^@]*@example.com", 
                       "listmaster@example.com",
                       "Please confirm subscription")
        self.no_more_mail()
        
        # Confirm sub. req. List sends welcome.
        self.send("listmaster@example.com", a)
        self.match("foo-ignore@example.com", 
                   None, 
                   "outsider@example.com", 
                   "Welcome to the")
        self.no_more_mail()

    def testOwnerUnubscribesSomeoneElse(self):
        # Send unsubscription request. List sends confirmation request.
        self.send("listmaster@example.com",
                  "foo-unsubscribe-outsider=example.com@example.com")
        a = self.match("foo-ignore@example.com", 
                       "foo-unsubyes-[^@]*@example.com", 
                       "listmaster@example.com",
                       "Subject: Please confirm UNsubscription")
        self.no_more_mail()
        
        # Confirm sub. req. List sends welcome.
        self.send("listmaster@example.com", a)
        self.match("foo-ignore@example.com", None, "outsider@example.com", 
                   "Goodbye")
        self.no_more_mail()

class SubscriptionTestCases(IncomingBase):

    def confirm(self, recipient):
        # List has sent confirmation request. Respond to it.
        a = self.match("foo-ignore@example.com", 
                       "foo-subyes-[^@]*@example.com", 
                       recipient,
                       "Please confirm subscription")
        self.no_more_mail()
        
        # Confirm sub. req. List response will be analyzed later.
        self.send("something.random@example.com", a)

    def got_welcome(self, recipient):
        self.match("foo-ignore@example.com", 
                   None, 
                   recipient, 
                   "Welcome to the")
        self.no_more_mail()

    def approve(self, user_recipient):
        self.match("foo-ignore@example.com", None, user_recipient)
        a = self.match("foo-ignore@example.com", 
                       "foo-subapprove-[^@]*@example.com",
                       "listmaster@example.com")
        self.send("listmaster@example.com", a)

    def reject(self, user_recipient):
        self.match("foo-ignore@example.com", None, user_recipient)
        a = self.match("foo-ignore@example.com", 
                       "foo-subreject-[^@]*@example.com",
                       "listmaster@example.com")
        self.send("listmaster@example.com", a)

    def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
        self.configure_list("free", "free")
        self.send("outsider@example.com", "foo-subscribe@example.com")
        self.confirm("outsider@example.com")
        self.got_welcome("outsider@example.com")

    def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
        self.configure_list("free", "free")
        self.send("user1@example.com", "foo-subscribe@example.com")
        self.confirm("user1@example.com")
        self.got_welcome("user1@example.com")

    def testSubscribeToUnmoderatedWithAddressNotOnList(self):
        self.configure_list("free", "free")
        self.send("somebody.else@example.com", 
                  "foo-subscribe-outsider=example.com@example.com")
        self.confirm("outsider@example.com")
        self.got_welcome("outsider@example.com")

    def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
        self.configure_list("free", "free")
        self.send("somebody.else@example.com", 
                  "foo-subscribe-user1=example.com@example.com")
        self.confirm("user1@example.com")
        self.got_welcome("user1@example.com")

    def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
        self.configure_list("moderated", "moderated")
        self.send("outsider@example.com", "foo-subscribe@example.com")
        self.confirm("outsider@example.com")
        self.approve("outsider@example.com")
        self.got_welcome("outsider@example.com")

    def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
        self.configure_list("moderated", "moderated")
        self.send("outsider@example.com", "foo-subscribe@example.com")
        self.confirm("outsider@example.com")
        self.reject("outsider@example.com")

    def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
        self.configure_list("moderated", "moderated")
        self.send("user1@example.com", "foo-subscribe@example.com")
        self.confirm("user1@example.com")
        self.approve("user1@example.com")
        self.got_welcome("user1@example.com")

    def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
        self.configure_list("moderated", "moderated")
        self.send("user1@example.com", "foo-subscribe@example.com")
        self.confirm("user1@example.com")
        self.reject("user1@example.com")

    def testSubscribeToModeratedWithAddressNotOnListApproved(self):
        self.configure_list("moderated", "moderated")
        self.send("somebody.else@example.com", 
                  "foo-subscribe-outsider=example.com@example.com")
        self.confirm("outsider@example.com")
        self.approve("outsider@example.com")
        self.got_welcome("outsider@example.com")

    def testSubscribeToModeratedWithAddressNotOnListRejected(self):
        self.configure_list("moderated", "moderated")
        self.send("somebody.else@example.com", 
                  "foo-subscribe-outsider=example.com@example.com")
        self.confirm("outsider@example.com")
        self.reject("outsider@example.com")

    def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
        self.configure_list("moderated", "moderated")
        self.send("somebody.else@example.com", 
                  "foo-subscribe-user1=example.com@example.com")
        self.confirm("user1@example.com")
        self.approve("user1@example.com")
        self.got_welcome("user1@example.com")

    def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
        self.configure_list("moderated", "moderated")
        self.send("somebody.else@example.com", 
                  "foo-subscribe-user1=example.com@example.com")
        self.confirm("user1@example.com")
        self.reject("user1@example.com")

class UnsubscriptionTestCases(IncomingBase):

    def confirm(self, recipient):
        # List has sent confirmation request. Respond to it.
        a = self.match("foo-ignore@example.com", 
                       "foo-unsubyes-[^@]*@example.com", 
                       recipient,
                       "Please confirm UNsubscription")
        self.no_more_mail()
        
        # Confirm sub. req. List response will be analyzed later.
        self.send("something.random@example.com", a)

    def got_goodbye(self, recipient):
        self.match("foo-ignore@example.com", 
                   None, 
                   recipient, 
                   "Goodbye from")
        self.no_more_mail()

    def testUnsubscribeWithoutAddressNotOnList(self):
        self.send("outsider@example.com", "foo-unsubscribe@example.com")
        self.confirm("outsider@example.com")
        self.got_goodbye("outsider@example.com")

    def testUnsubscribeWithoutAddressOnList(self):
        self.send("user1@example.com", "foo-unsubscribe@example.com")
        self.confirm("user1@example.com")
        self.got_goodbye("user1@example.com")

    def testUnsubscribeWithAddressNotOnList(self):
        self.send("somebody.else@example.com", 
                  "foo-unsubscribe-outsider=example.com@example.com")
        self.confirm("outsider@example.com")
        self.got_goodbye("outsider@example.com")

    def testUnsubscribeWithAddressOnList(self):
        self.send("somebody.else@example.com", 
                  "foo-unsubscribe-user1=example.com@example.com")
        self.confirm("user1@example.com")
        self.got_goodbye("user1@example.com")

class PostTestCases(IncomingBase):

    msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")

    def approve(self, user_recipient):
        self.match("foo-ignore@example.com", None, user_recipient)
        a = self.match("foo-ignore@example.com", 
                       "foo-approve-[^@]*@example.com",
                       "listmaster@example.com")
        self.send("listmaster@example.com", a)

    def reject(self, user_recipient):
        self.match("foo-ignore@example.com", None, user_recipient)
        a = self.match("foo-ignore@example.com", 
                       "foo-reject-[^@]*@example.com",
                       "listmaster@example.com")
        self.send("listmaster@example.com", a)

    def check_headers_are_encoded(self):
        ok_chars = "\t\r\n"
        for code in range(32, 127):
            ok_chars = ok_chars + chr(code)
        for mail in self.sent_mail:
            text = mail["text"]
            self.failUnless("\n\n" in text)
            headers = text.split("\n\n")[0]
            for c in headers:
                if c not in ok_chars: print headers
                self.failUnless(c in ok_chars)

    def check_mail_to_list(self):
        self.check_headers_are_encoded()
        self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
                   body="hello, world",
                   header="X-Foo: FOO",
                   anti_header="Received:")
        self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
                   body="hello, world",
                   header="x-foo: foo",
                   anti_header="Received:")
        self.no_more_mail()

    def check_that_moderation_box_is_empty(self):
        ml = self.mlm.open_list("foo@example.com")
        self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])

    def testSubscriberPostsToUnmoderated(self):
        self.configure_list("free", "free")
        self.send("user1@example.com", "foo@example.com", 
                  self.msg)
        self.check_mail_to_list()

    def testOutsiderPostsToUnmoderated(self):
        self.configure_list("free", "free")
        self.send("outsider@example.com", "foo@example.com", self.msg)
        self.check_mail_to_list()

    def testSubscriberPostToAutomoderated(self):
        self.configure_list("free", "auto")
        self.check_that_moderation_box_is_empty()
        self.send("user1@example.com", "foo@example.com", self.msg)
        self.check_mail_to_list()
        self.check_that_moderation_box_is_empty()

    def testOutsiderPostsToAutomoderatedRejected(self):
        self.configure_list("free", "auto")
        self.check_that_moderation_box_is_empty()
        self.send("outsider@example.com", "foo@example.com", self.msg)
        self.reject("outsider@example.com")
        self.check_that_moderation_box_is_empty()

    def testOutsiderPostsToAutomoderatedApproved(self):
        self.configure_list("free", "auto")
        self.check_that_moderation_box_is_empty()
        self.send("outsider@example.com", "foo@example.com", self.msg)
        self.approve("outsider@example.com")
        self.check_mail_to_list()
        self.check_that_moderation_box_is_empty()

    def testSubscriberPostsToModeratedRejected(self):
        self.configure_list("free", "moderated")
        self.check_that_moderation_box_is_empty()
        self.send("user1@example.com", "foo@example.com", self.msg)
        self.reject("user1@example.com")
        self.check_that_moderation_box_is_empty()

    def testOutsiderPostsToMderatedApproved(self):
        self.configure_list("free", "moderated")
        self.check_that_moderation_box_is_empty()
        self.send("outsider@example.com", "foo@example.com", self.msg)
        self.approve("outsider@example.com")
        self.check_mail_to_list()
        self.check_that_moderation_box_is_empty()

    def testSubscriberPostsWithRequestToBeModerated(self):
        self.configure_list("free", "free")

        self.check_that_moderation_box_is_empty()
        self.send("user1@example.com", "foo@example.com", self.msg,
                  force_moderation=1)
        self.match("foo-ignore@example.com", 
                   None, 
                   "user1@example.com", 
                   "Subject: Please wait")
        a = self.match("foo-ignore@example.com", 
                       "foo-approve-[^@]*@example.com", 
                       "listmaster@example.com")
        self.no_more_mail()

        self.send("listmaster@example.com", a)
        self.check_mail_to_list()
        self.check_that_moderation_box_is_empty()

    def testSubscriberPostsWithModerationOverride(self):
        self.configure_list("moderated", "moderated")
        self.send("user1@example.com", "foo@example.com", self.msg,
                  force_posting=1)
        self.check_mail_to_list()
        self.check_that_moderation_box_is_empty()

class BounceTestCases(IncomingBase):

    def check_subscriber_status(self, must_be):
        ml = self.mlm.open_list("foo@example.com")
        for id in ml.subscribers.groups():
            self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)

    def bounce_sent_mail(self):
        for m in self.sent_mail[:]:
            self.send("something@example.com", m["sender"], "eek")
            self.failUnlessEqual(len(self.sent_mail), 0)

    def send_mail_to_list_then_bounce_everything(self):
        self.send("user@example.com", "foo@example.com", "hello")
        for m in self.sent_mail[:]:
            self.send("foo@example.com", m["sender"], "eek")
            self.failUnlessEqual(len(self.sent_mail), 0)

    def testBounceOnceThenRecover(self):
        self.check_subscriber_status("ok")
        self.send_mail_to_list_then_bounce_everything()

        self.check_subscriber_status("bounced")
        
        ml = self.mlm.open_list("foo@example.com")
        for id in ml.subscribers.groups():
            bounce_id = ml.subscribers.get(id, "bounce-id")
            self.failUnless(bounce_id)
            self.failUnless(ml.bounce_box.has(bounce_id))

        bounce_ids = []
        now = time.time()
        ml = self.mlm.open_list("foo@example.com")
        ml.subscribers.lock()
        for id in ml.subscribers.groups():
            timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
            self.failUnless(abs(timestamp - now) < 10.0)
            ml.subscribers.set(id, "timestamp-bounced", "69.0")
            bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
        ml.subscribers.save()

        self.mlm.cleaning_woman(no_op)
        self.check_subscriber_status("probed")

        for bounce_id in bounce_ids:
            self.failUnless(ml.bounce_box.has(bounce_id))

        self.mlm.cleaning_woman(no_op)
        ml = self.mlm.open_list("foo@example.com")
        self.failUnlessEqual(len(ml.subscribers.groups()), 2)
        self.check_subscriber_status("ok")
        for bounce_id in bounce_ids:
            self.failUnless(not ml.bounce_box.has(bounce_id))

    def testBounceProbeAlso(self):
        self.check_subscriber_status("ok")
        self.send_mail_to_list_then_bounce_everything()
        self.check_subscriber_status("bounced")
        
        ml = self.mlm.open_list("foo@example.com")
        for id in ml.subscribers.groups():
            bounce_id = ml.subscribers.get(id, "bounce-id")
            self.failUnless(bounce_id)
            self.failUnless(ml.bounce_box.has(bounce_id))

        bounce_ids = []
        now = time.time()
        ml = self.mlm.open_list("foo@example.com")
        ml.subscribers.lock()
        for id in ml.subscribers.groups():
            timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
            self.failUnless(abs(timestamp - now) < 10.0)
            ml.subscribers.set(id, "timestamp-bounced", "69.0")
            bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
        ml.subscribers.save()

        self.sent_mail = []
        self.mlm.cleaning_woman(self.catch_sendmail)
        self.check_subscriber_status("probed")
        for bounce_id in bounce_ids:
            self.failUnless(ml.bounce_box.has(bounce_id))
        self.bounce_sent_mail()
        self.check_subscriber_status("probebounced")

        self.mlm.cleaning_woman(no_op)
        ml = self.mlm.open_list("foo@example.com")
        self.failUnlessEqual(len(ml.subscribers.groups()), 0)
        for bounce_id in bounce_ids:
            self.failUnless(not ml.bounce_box.has(bounce_id))

    def testCleaningWomanJoinsAndBounceSplitsGroups(self):
        # Check that each group contains one address and set the creation
        # timestamp to an ancient time.
        ml = self.mlm.open_list("foo@example.com")
        bouncedir = os.path.join(ml.dirname, "bounce-box")
        ml.subscribers.lock()
        for id in ml.subscribers.groups():
            addrs = ml.subscribers.in_group(id)
            self.failUnlessEqual(len(addrs), 1)
            bounce_id = ml.subscribers.get(id, "bounce-id")
            self.failUnlessEqual(bounce_id, "..notexist..")
            bounce_id = "bounce-" + id
            ml.subscribers.set(id, "bounce-id", bounce_id)
            bounce_path = os.path.join(bouncedir, bounce_id)
            self.failUnless(not os.path.isfile(bounce_path))
            f = open(bounce_path, "w")
            f.close()
            f = open(bounce_path + ".address", "w")
            f.close()
            self.failUnless(os.path.isfile(bounce_path))
            ml.subscribers.set(id, "timestamp-created", "1")
        ml.subscribers.save()

        # Check that --cleaning-woman joins the two groups into one.
        self.failUnlessEqual(len(ml.subscribers.groups()), 2)
        self.mlm.cleaning_woman(no_op)
        ml = self.mlm.open_list("foo@example.com")
        self.failUnlessEqual(len(ml.subscribers.groups()), 1)
        self.failUnlessEqual(os.listdir(bouncedir), [])
        
        # Check that a bounce splits the single group.
        self.send_mail_to_list_then_bounce_everything()
        ml = self.mlm.open_list("foo@example.com")
        self.failUnlessEqual(len(ml.subscribers.groups()), 2)
        
        # Check that a --cleaning-woman immediately after doesn't join.
        # (The groups are new, thus shouldn't be joined for a week.)
        self.failUnlessEqual(len(ml.subscribers.groups()), 2)
        self.mlm.cleaning_woman(no_op)
        ml = self.mlm.open_list("foo@example.com")
        self.failUnlessEqual(len(ml.subscribers.groups()), 2)
