K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / share / apport / testsuite /
server ip : 172.67.156.115

your ip : 172.70.127.135

H O M E


Filename/usr/share/apport/testsuite/test_problem_report.py
Size31.47 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:55
Last modified04-Apr-2014 22:30
Last accessed06-Jul-2025 19:11
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# vim: set encoding=UTF-8 fileencoding=UTF-8 :
import unittest, tempfile, os, email, gzip, time, sys

from io import BytesIO
import problem_report

bin_data = b'ABABABABAB\0\0\0Z\x01\x02'

if sys.version < '3':
# backwards compatibility shim
email.message_from_binary_file = email.message_from_file


class T(unittest.TestCase):
def test_basic_operations(self):
'''basic creation and operation.'''

pr = problem_report.ProblemReport()
pr['foo'] = 'bar'
pr['bar'] = ' foo bar\nbaz\n blip '
pr['dash-key'] = '1'
pr['dot.key'] = '1'
pr['underscore_key'] = '1'
self.assertEqual(pr['foo'], 'bar')
self.assertEqual(pr['bar'], ' foo bar\nbaz\n blip ')
self.assertEqual(pr['ProblemType'], 'Crash')
self.assertTrue(time.strptime(pr['Date']))
self.assertEqual(pr['dash-key'], '1')
self.assertEqual(pr['dot.key'], '1')
self.assertEqual(pr['underscore_key'], '1')

def test_ctor_arguments(self):
'''non-default constructor arguments.'''

pr = problem_report.ProblemReport('KernelCrash')
self.assertEqual(pr['ProblemType'], 'KernelCrash')
pr = problem_report.ProblemReport(date='19801224 12:34')
self.assertEqual(pr['Date'], '19801224 12:34')

def test_sanity_checks(self):
'''various error conditions.'''

pr = problem_report.ProblemReport()
self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')

def test_compressed_values(self):
'''handling of CompressedValue values.'''

large_val = b'A' * 5000000

pr = problem_report.ProblemReport()
pr['Foo'] = problem_report.CompressedValue(b'FooFoo!')
pr['Bin'] = problem_report.CompressedValue()
pr['Bin'].set_value(bin_data)
pr['Large'] = problem_report.CompressedValue(large_val)

self.assertTrue(isinstance(pr['Foo'], problem_report.CompressedValue))
self.assertTrue(isinstance(pr['Bin'], problem_report.CompressedValue))
self.assertEqual(pr['Foo'].get_value(), b'FooFoo!')
self.assertEqual(pr['Bin'].get_value(), bin_data)
self.assertEqual(pr['Large'].get_value(), large_val)
self.assertEqual(len(pr['Foo']), 7)
self.assertEqual(len(pr['Bin']), len(bin_data))
self.assertEqual(len(pr['Large']), len(large_val))

io = BytesIO()
pr['Bin'].write(io)
self.assertEqual(io.getvalue(), bin_data)
io = BytesIO()
pr['Large'].write(io)
self.assertEqual(io.getvalue(), large_val)

pr['Multiline'] = problem_report.CompressedValue(b'\1\1\1\n\2\2\n\3\3\3')
self.assertEqual(pr['Multiline'].splitlines(),
[b'\1\1\1', b'\2\2', b'\3\3\3'])

# test writing of reports with CompressedValues
io = BytesIO()
pr.write(io)
io.seek(0)
pr = problem_report.ProblemReport()
pr.load(io)
self.assertEqual(pr['Foo'], 'FooFoo!')
self.assertEqual(pr['Bin'], bin_data)
self.assertEqual(pr['Large'], large_val.decode('ASCII'))

def test_write(self):
'''write() and proper formatting.'''

pr = problem_report.ProblemReport(date='now!')
pr['Simple'] = 'bar'
if sys.version.startswith('2'):
pr['SimpleUTF8'] = '1äö2Φ3'
pr['SimpleUnicode'] = '1äö2Φ3'.decode('UTF-8')
pr['TwoLineUnicode'] = 'pi-π\nnu-η'.decode('UTF-8')
pr['TwoLineUTF8'] = 'pi-π\nnu-η'
else:
pr['SimpleUTF8'] = '1äö2Φ3'.encode('UTF-8')
pr['SimpleUnicode'] = '1äö2Φ3'
pr['TwoLineUnicode'] = 'pi-π\nnu-η'
pr['TwoLineUTF8'] = 'pi-π\nnu-η'.encode('UTF-8')
pr['WhiteSpace'] = ' foo bar\nbaz\n blip \n\nafteremptyline'
# Unicode with a non-space low ASCII character \x05 in it
pr['UnprintableUnicode'] = b'a\xc3\xa4\x05z1\xc3\xa9'.decode('UTF-8')
io = BytesIO()
pr.write(io)
expected = '''ProblemType: Crash
Date: now!
Simple: bar
SimpleUTF8: 1äö2Φ3
SimpleUnicode: 1äö2Φ3
TwoLineUTF8:
pi-π
nu-η
TwoLineUnicode:
pi-π
nu-η
UnprintableUnicode: aä\x05z1é
WhiteSpace:
foo bar
baz
blip

afteremptyline
'''
if sys.version > '3':
expected = expected.encode('UTF-8')
self.assertEqual(io.getvalue(), expected)

def test_write_append(self):
'''write() with appending to an existing file.'''

pr = problem_report.ProblemReport(date='now!')
pr['Simple'] = 'bar'
pr['WhiteSpace'] = ' foo bar\nbaz\n blip '
io = BytesIO()
pr.write(io)

pr.clear()
pr['Extra'] = 'appended'
pr.write(io)

self.assertEqual(io.getvalue(),
b'''ProblemType: Crash
Date: now!
Simple: bar
WhiteSpace:
foo bar
baz
blip
Extra: appended
''')

temp = tempfile.NamedTemporaryFile()
temp.write(bin_data)
temp.flush()

pr = problem_report.ProblemReport(date='now!')
pr['File'] = (temp.name,)
io = BytesIO()
pr.write(io)
temp.close()

pr.clear()
pr['Extra'] = 'appended'
pr.write(io)

io.seek(0)
pr = problem_report.ProblemReport()
pr.load(io)

self.assertEqual(pr['Date'], 'now!')
self.assertEqual(pr['File'], bin_data)
self.assertEqual(pr['Extra'], 'appended')

def test_load(self):
'''load() with various formatting.'''

pr = problem_report.ProblemReport()
pr.load(BytesIO(b'''ProblemType: Crash
Date: now!
Simple: bar
WhiteSpace:
foo bar
baz
blip
'''))
self.assertEqual(pr['ProblemType'], 'Crash')
self.assertEqual(pr['Date'], 'now!')
self.assertEqual(pr['Simple'], 'bar')
self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip ')

# test last field a bit more
pr.load(BytesIO(b'''ProblemType: Crash
Date: now!
Simple: bar
WhiteSpace:
foo bar
baz
blip

'''))
self.assertEqual(pr['ProblemType'], 'Crash')
self.assertEqual(pr['Date'], 'now!')
self.assertEqual(pr['Simple'], 'bar')
self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip \n')

# last field might not be \n terminated
pr.load(BytesIO(b'''ProblemType: Crash
Date: now!
Simple: bar
WhiteSpace:
foo
bar'''))
self.assertEqual(pr['ProblemType'], 'Crash')
self.assertEqual(pr['Date'], 'now!')
self.assertEqual(pr['Simple'], 'bar')
self.assertEqual(pr['WhiteSpace'], 'foo\nbar')

pr = problem_report.ProblemReport()
pr.load(BytesIO(b'''ProblemType: Crash
WhiteSpace:
foo bar
baz

blip
Last: foo
'''))
self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n\n blip ')
self.assertEqual(pr['Last'], 'foo')

pr.load(BytesIO(b'''ProblemType: Crash
WhiteSpace:
foo bar
baz
blip
Last: foo

'''))
self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip ')
self.assertEqual(pr['Last'], 'foo\n')

# empty lines in values must have a leading space in coding
invalid_spacing = BytesIO(b'''WhiteSpace:
first

second
''')
pr = problem_report.ProblemReport()
self.assertRaises(ValueError, pr.load, invalid_spacing)

# test that load() cleans up properly
pr.load(BytesIO(b'ProblemType: Crash'))
self.assertEqual(list(pr.keys()), ['ProblemType'])

def test_write_file(self):
'''writing a report with binary file data.'''

temp = tempfile.NamedTemporaryFile()
temp.write(bin_data)
temp.flush()

pr = problem_report.ProblemReport(date='now!')
pr['File'] = (temp.name,)
pr['Afile'] = (temp.name,)
io = BytesIO()
pr.write(io)
temp.close()

self.assertEqual(io.getvalue(),
b'''ProblemType: Crash
Date: now!
Afile: base64
H4sICAAAAAAC/0FmaWxlAA==
c3RyhEIGBoYoRiYAM5XUCxAAAAA=
File: base64
H4sICAAAAAAC/0ZpbGUA
c3RyhEIGBoYoRiYAM5XUCxAAAAA=
''')

# force compression/encoding bool
temp = tempfile.NamedTemporaryFile()
temp.write(b'foo\0bar')
temp.flush()
pr = problem_report.ProblemReport(date='now!')
pr['File'] = (temp.name, False)
io = BytesIO()
pr.write(io)

self.assertEqual(io.getvalue(),
b'''ProblemType: Crash
Date: now!
File: foo\0bar
''')

pr['File'] = (temp.name, True)
io = BytesIO()
pr.write(io)

self.assertEqual(io.getvalue(),
b'''ProblemType: Crash
Date: now!
File: base64
H4sICAAAAAAC/0ZpbGUA
S8vPZ0hKLAIACq50HgcAAAA=
''')
temp.close()

def test_write_fileobj(self):
'''writing a report with a pointer to a file-like object.'''

tempbin = BytesIO(bin_data)
tempasc = BytesIO(b'Hello World')

pr = problem_report.ProblemReport(date='now!')
pr['BinFile'] = (tempbin,)
pr['AscFile'] = (tempasc, False)
io = BytesIO()
pr.write(io)
io.seek(0)

pr = problem_report.ProblemReport()
pr.load(io)
self.assertEqual(pr['BinFile'], tempbin.getvalue())
self.assertEqual(pr['AscFile'], tempasc.getvalue().decode())

def test_write_empty_fileobj(self):
'''writing a report with a pointer to a file-like object with enforcing non-emptyness.'''

tempbin = BytesIO(b'')
tempasc = BytesIO(b'')

pr = problem_report.ProblemReport(date='now!')
pr['BinFile'] = (tempbin, True, None, True)
io = BytesIO()
self.assertRaises(IOError, pr.write, io)

pr = problem_report.ProblemReport(date='now!')
pr['AscFile'] = (tempasc, False, None, True)
io = BytesIO()
self.assertRaises(IOError, pr.write, io)

def test_write_delayed_fileobj(self):
'''writing a report with file pointers and delayed data.'''

(fout, fin) = os.pipe()

if os.fork() == 0:
os.close(fout)
time.sleep(0.3)
os.write(fin, b'ab' * 512 * 1024)
time.sleep(0.3)
os.write(fin, b'hello')
time.sleep(0.3)
os.write(fin, b' world')
os.close(fin)
os._exit(0)

os.close(fin)

pr = problem_report.ProblemReport(date='now!')
io = BytesIO()
with os.fdopen(fout, 'rb') as f:
pr['BinFile'] = (f,)
pr.write(io)
assert os.wait()[1] == 0

io.seek(0)

pr2 = problem_report.ProblemReport()
pr2.load(io)
self.assertEqual(pr2['BinFile'], 'ab' * 512 * 1024 + 'hello world')

def test_read_file(self):
'''reading a report with binary data.'''

bin_report = b'''ProblemType: Crash
Date: now!
File: base64
H4sICAAAAAAC/0ZpbGUA
c3RyhEIGBoYoRiYAM5XUCxAAAAA=
Foo: Bar
'''

# test with reading everything
pr = problem_report.ProblemReport()
pr.load(BytesIO(bin_report))
self.assertEqual(pr['File'], bin_data)
self.assertEqual(pr.has_removed_fields(), False)

# test with skipping binary data
pr.load(BytesIO(bin_report), binary=False)
self.assertEqual(pr['File'], '')
self.assertEqual(pr.has_removed_fields(), True)

# test with keeping compressed binary data
pr.load(BytesIO(bin_report), binary='compressed')
self.assertEqual(pr['Foo'], 'Bar')
self.assertEqual(pr.has_removed_fields(), False)
self.assertTrue(isinstance(pr['File'], problem_report.CompressedValue))
self.assertEqual(len(pr['File']), len(bin_data))
self.assertEqual(pr['File'].get_value(), bin_data)

def test_read_file_legacy(self):
'''reading a report with binary data in legacy format without gzip
header.'''

bin_report = b'''ProblemType: Crash
Date: now!
File: base64
eJw=
c3RyxIAMcBAFAG55BXk=
Foo: Bar
'''

# test with reading everything
pr = problem_report.ProblemReport()
pr.load(BytesIO(bin_report))
self.assertEqual(pr['File'], b'AB' * 10 + b'\0' * 10 + b'Z')
self.assertEqual(pr.has_removed_fields(), False)

# test with skipping binary data
pr.load(BytesIO(bin_report), binary=False)
self.assertEqual(pr['File'], '')
self.assertEqual(pr.has_removed_fields(), True)

# test with keeping CompressedValues
pr.load(BytesIO(bin_report), binary='compressed')
self.assertEqual(pr.has_removed_fields(), False)
self.assertEqual(len(pr['File']), 31)
self.assertEqual(pr['File'].get_value(), b'AB' * 10 + b'\0' * 10 + b'Z')
io = BytesIO()
pr['File'].write(io)
io.seek(0)
self.assertEqual(io.read(), b'AB' * 10 + b'\0' * 10 + b'Z')

def test_big_file(self):
'''writing and re-decoding a big random file.'''

# create 1 MB random file
temp = tempfile.NamedTemporaryFile()
data = os.urandom(1048576)
temp.write(data)
temp.flush()

# write it into problem report
pr = problem_report.ProblemReport()
pr['File'] = (temp.name,)
pr['Before'] = 'xtestx'
pr['ZAfter'] = 'ytesty'
io = BytesIO()
pr.write(io)
temp.close()

# read it again
io.seek(0)
pr = problem_report.ProblemReport()
pr.load(io)

self.assertTrue(pr['File'] == data)
self.assertEqual(pr['Before'], 'xtestx')
self.assertEqual(pr['ZAfter'], 'ytesty')

# write it again
io2 = BytesIO()
pr.write(io2)
self.assertTrue(io.getvalue() == io2.getvalue())

# check gzip compatibility
io.seek(0)
pr = problem_report.ProblemReport()
pr.load(io, binary='compressed')
self.assertEqual(pr['File'].get_value(), data)

def test_size_limit(self):
'''writing and a big random file with a size limit key.'''

# create 1 MB random file
temp = tempfile.NamedTemporaryFile()
data = os.urandom(1048576)
temp.write(data)
temp.flush()

# write it into problem report
pr = problem_report.ProblemReport()
pr['FileSmallLimit'] = (temp.name, True, 100)
pr['FileLimitMinus1'] = (temp.name, True, 1048575)
pr['FileExactLimit'] = (temp.name, True, 1048576)
pr['FileLimitPlus1'] = (temp.name, True, 1048577)
pr['FileLimitNone'] = (temp.name, True, None)
pr['Before'] = 'xtestx'
pr['ZAfter'] = 'ytesty'
io = BytesIO()
pr.write(io)
temp.close()

# read it again
io.seek(0)
pr = problem_report.ProblemReport()
pr.load(io)

self.assertFalse('FileSmallLimit' in pr)
self.assertFalse('FileLimitMinus1' in pr)
self.assertTrue(pr['FileExactLimit'] == data)
self.assertTrue(pr['FileLimitPlus1'] == data)
self.assertTrue(pr['FileLimitNone'] == data)
self.assertEqual(pr['Before'], 'xtestx')
self.assertEqual(pr['ZAfter'], 'ytesty')

def test_iter(self):
'''problem_report.ProblemReport iteration.'''

pr = problem_report.ProblemReport()
pr['foo'] = 'bar'

keys = []
for k in pr:
keys.append(k)
keys.sort()
self.assertEqual(' '.join(keys), 'Date ProblemType foo')

self.assertEqual(len([k for k in pr if k != 'foo']), 2)

def test_modify(self):
'''reading, modifying fields, and writing back.'''

report = b'''ProblemType: Crash
Date: now!
Long:
xxx
.
yyy
Short: Bar
File: base64
H4sICAAAAAAC/0ZpbGUA
c3RyxIAMcBAFAK/2p9MfAAAA
'''

pr = problem_report.ProblemReport()
pr.load(BytesIO(report))

self.assertEqual(pr['Long'], 'xxx\n.\nyyy')

# write back unmodified
io = BytesIO()
pr.write(io)
self.assertEqual(io.getvalue(), report)

pr['Short'] = 'aaa\nbbb'
pr['Long'] = '123'
io = BytesIO()
pr.write(io)
self.assertEqual(io.getvalue(),
b'''ProblemType: Crash
Date: now!
Long: 123
Short:
aaa
bbb
File: base64
H4sICAAAAAAC/0ZpbGUA
c3RyxIAMcBAFAK/2p9MfAAAA
''')

def test_add_to_existing(self):
'''adding information to an existing report.'''

# original report
pr = problem_report.ProblemReport()
pr['old1'] = '11'
pr['old2'] = '22'

(fd, rep) = tempfile.mkstemp()
os.close(fd)
with open(rep, 'wb') as f:
pr.write(f)

origstat = os.stat(rep)

# create a new one and add it
pr = problem_report.ProblemReport()
pr.clear()
pr['new1'] = '33'

pr.add_to_existing(rep, keep_times=True)

# check keep_times
newstat = os.stat(rep)
self.assertEqual(origstat.st_mode, newstat.st_mode)
self.assertAlmostEqual(origstat.st_atime, newstat.st_atime, 1)
self.assertAlmostEqual(origstat.st_mtime, newstat.st_mtime, 1)

# check report contents
newpr = problem_report.ProblemReport()
with open(rep, 'rb') as f:
newpr.load(f)
self.assertEqual(newpr['old1'], '11')
self.assertEqual(newpr['old2'], '22')
self.assertEqual(newpr['new1'], '33')

# create a another new one and add it, but make sure mtime must be
# different
time.sleep(1)
with open(rep) as f:
f.read() # bump atime
time.sleep(1)

pr = problem_report.ProblemReport()
pr.clear()
pr['new2'] = '44'

pr.add_to_existing(rep)

# check that timestamps have been updates
newstat = os.stat(rep)
self.assertEqual(origstat.st_mode, newstat.st_mode)
self.assertNotEqual(origstat.st_mtime, newstat.st_mtime)
# skip atime check if filesystem is mounted noatime
skip_atime = False
dir = rep
while len(dir) > 1:
dir, filename = os.path.split(dir)
if os.path.ismount(dir):
with open('/proc/mounts') as f:
for line in f:
mount, fs, options = line.split(' ')[1:4]
if mount == dir and 'noatime' in options.split(','):
skip_atime = True
break
break
if not skip_atime:
self.assertNotEqual(origstat.st_atime, newstat.st_atime)

# check report contents
newpr = problem_report.ProblemReport()
with open(rep, 'rb') as f:
newpr.load(f)
self.assertEqual(newpr['old1'], '11')
self.assertEqual(newpr['old2'], '22')
self.assertEqual(newpr['new1'], '33')
self.assertEqual(newpr['new2'], '44')

os.unlink(rep)

def test_write_mime_text(self):
'''write_mime() for text values.'''

pr = problem_report.ProblemReport(date='now!')
pr['Simple'] = 'bar'
if sys.version.startswith('2'):
pr['SimpleUTF8'] = '1äö2Φ3'
pr['SimpleUnicode'] = '1äö2Φ3'.decode('UTF-8')
pr['TwoLineUnicode'] = 'pi-π\nnu-η\n'.decode('UTF-8')
pr['TwoLineUTF8'] = 'pi-π\nnu-η\n'
else:
pr['SimpleUTF8'] = '1äö2Φ3'.encode('UTF-8')
pr['SimpleUnicode'] = '1äö2Φ3'
pr['TwoLineUnicode'] = 'pi-π\nnu-η\n'
pr['TwoLineUTF8'] = 'pi-π\nnu-η\n'.encode('UTF-8')
pr['SimpleLineEnd'] = 'bar\n'
pr['TwoLine'] = 'first\nsecond\n'
pr['InlineMargin'] = 'first\nsecond\nthird\nfourth\nfifth\n'
pr['Multiline'] = ' foo bar\nbaz\n blip \nline4\nline♥5!!\nłıµ€ ⅝\n'

# still small enough for inline text
pr['Largeline'] = 'A' * 999
pr['LargeMultiline'] = 'A' * 120 + '\n' + 'B' * 90

# too big for inline text, these become attachments
pr['Hugeline'] = 'A' * 10000
pr['HugeMultiline'] = 'A' * 900 + '\n' + 'B' * 900 + '\n' + 'C' * 900
io = BytesIO()
pr.write_mime(io)
io.seek(0)

msg = email.message_from_binary_file(io)
parts = [p for p in msg.walk()]
self.assertEqual(len(parts), 5)

# first part is the multipart container
self.assertTrue(parts[0].is_multipart())

# second part should be an inline text/plain attachments with all short
# fields
self.assertTrue(not parts[1].is_multipart())
self.assertEqual(parts[1].get_content_type(), 'text/plain')
self.assertEqual(parts[1].get_content_charset(), 'utf-8')
self.assertEqual(parts[1].get_filename(), None)
expected = '''ProblemType: Crash
Date: now!
InlineMargin:
first
second
third
fourth
fifth
LargeMultiline:
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
Largeline: %s
Simple: bar
SimpleLineEnd: bar
SimpleUTF8: 1äö2Φ3
SimpleUnicode: 1äö2Φ3
TwoLine:
first
second
TwoLineUTF8:
pi-π
nu-η
TwoLineUnicode:
pi-π
nu-η
''' % pr['Largeline']
if sys.version >= '3':
expected = expected.encode('UTF-8')
self.assertEqual(parts[1].get_payload(decode=True), expected)

# third part should be the HugeMultiline: field as attachment
self.assertTrue(not parts[2].is_multipart())
self.assertEqual(parts[2].get_content_type(), 'text/plain')
self.assertEqual(parts[2].get_content_charset(), 'utf-8')
self.assertEqual(parts[2].get_filename(), 'HugeMultiline.txt')
self.assertEqual(parts[2].get_payload(decode=True), pr['HugeMultiline'].encode())

# fourth part should be the Hugeline: field as attachment
self.assertTrue(not parts[3].is_multipart())
self.assertEqual(parts[3].get_content_type(), 'text/plain')
self.assertEqual(parts[3].get_content_charset(), 'utf-8')
self.assertEqual(parts[3].get_filename(), 'Hugeline.txt')
self.assertEqual(parts[3].get_payload(decode=True), pr['Hugeline'].encode())

# fifth part should be the Multiline: field as attachment
self.assertTrue(not parts[4].is_multipart())
self.assertEqual(parts[4].get_content_type(), 'text/plain')
self.assertEqual(parts[4].get_content_charset(), 'utf-8')
self.assertEqual(parts[4].get_filename(), 'Multiline.txt')
expected = ''' foo bar
baz
blip
line4
line♥5!!
łıµ€ ⅝
'''
if sys.version >= '3':
expected = expected.encode('UTF-8')
self.assertEqual(parts[4].get_payload(decode=True), expected)

def test_write_mime_binary(self):
'''write_mime() for binary values and file references.'''

temp = tempfile.NamedTemporaryFile()
temp.write(bin_data)
temp.flush()

tempgz = tempfile.NamedTemporaryFile()
gz = gzip.GzipFile('File1', 'w', fileobj=tempgz)
gz.write(bin_data)
gz.close()
tempgz.flush()

pr = problem_report.ProblemReport(date='now!')
pr['Context'] = 'Test suite'
pr['File1'] = (temp.name,)
pr['File1.gz'] = (tempgz.name,)
pr['Value1'] = bin_data
with open(tempgz.name, 'rb') as f:
pr['Value1.gz'] = f.read()
pr['ZValue'] = problem_report.CompressedValue(bin_data)
io = BytesIO()
pr.write_mime(io)
io.seek(0)

msg = email.message_from_binary_file(io)
parts = [p for p in msg.walk()]
self.assertEqual(len(parts), 7)

# first part is the multipart container
self.assertTrue(parts[0].is_multipart())

# second part should be an inline text/plain attachments with all short
# fields
self.assertTrue(not parts[1].is_multipart())
self.assertEqual(parts[1].get_content_type(), 'text/plain')
self.assertEqual(parts[1].get_content_charset(), 'utf-8')
self.assertEqual(parts[1].get_filename(), None)
self.assertEqual(parts[1].get_payload(decode=True),
b'ProblemType: Crash\nContext: Test suite\nDate: now!\n')

# third part should be the File1: file contents as gzip'ed attachment
self.assertTrue(not parts[2].is_multipart())
self.assertEqual(parts[2].get_content_type(), 'application/x-gzip')
self.assertEqual(parts[2].get_filename(), 'File1.gz')
f = tempfile.TemporaryFile()
f.write(parts[2].get_payload(decode=True))
f.seek(0)
self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
f.close()

# fourth part should be the File1.gz: file contents as gzip'ed
# attachment; write_mime() should not compress it again
self.assertTrue(not parts[3].is_multipart())
self.assertEqual(parts[3].get_content_type(), 'application/x-gzip')
self.assertEqual(parts[3].get_filename(), 'File1.gz')
f = tempfile.TemporaryFile()
f.write(parts[3].get_payload(decode=True))
f.seek(0)
self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
f.close()

# fifth part should be the Value1: value as gzip'ed attachment
self.assertTrue(not parts[4].is_multipart())
self.assertEqual(parts[4].get_content_type(), 'application/x-gzip')
self.assertEqual(parts[4].get_filename(), 'Value1.gz')
f = tempfile.TemporaryFile()
f.write(parts[4].get_payload(decode=True))
f.seek(0)
self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
f.close()

# sixth part should be the Value1: value as gzip'ed attachment;
# write_mime should not compress it again
self.assertTrue(not parts[5].is_multipart())
self.assertEqual(parts[5].get_content_type(), 'application/x-gzip')
self.assertEqual(parts[5].get_filename(), 'Value1.gz')
f = tempfile.TemporaryFile()
f.write(parts[5].get_payload(decode=True))
f.seek(0)
self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
f.close()

# seventh part should be the ZValue: value as gzip'ed attachment;
# write_mime should not compress it again
self.assertTrue(not parts[6].is_multipart())
self.assertEqual(parts[6].get_content_type(), 'application/x-gzip')
self.assertEqual(parts[6].get_filename(), 'ZValue.gz')
f = tempfile.TemporaryFile()
f.write(parts[6].get_payload(decode=True))
f.seek(0)
self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
f.close()

def test_write_mime_extra_headers(self):
'''write_mime() with extra headers.'''

pr = problem_report.ProblemReport(date='now!')
pr['Simple'] = 'bar'
pr['TwoLine'] = 'first\nsecond\n'
io = BytesIO()
pr.write_mime(io, extra_headers={'Greeting': 'hello world',
'Foo': 'Bar'})
io.seek(0)

msg = email.message_from_binary_file(io)
self.assertEqual(msg['Greeting'], 'hello world')
self.assertEqual(msg['Foo'], 'Bar')
parts = [p for p in msg.walk()]
self.assertEqual(len(parts), 2)

# first part is the multipart container
self.assertTrue(parts[0].is_multipart())

# second part should be an inline text/plain attachments with all short
# fields
self.assertTrue(not parts[1].is_multipart())
self.assertEqual(parts[1].get_content_type(), 'text/plain')
self.assertTrue(b'Simple: bar' in parts[1].get_payload(decode=True))

def test_write_mime_filter(self):
'''write_mime() with key filters.'''

pr = problem_report.ProblemReport(date='now!')
pr['GoodText'] = 'Hi'
pr['BadText'] = 'YouDontSeeMe'
pr['GoodBin'] = bin_data
pr['BadBin'] = 'Y' + '\x05' * 10 + '-'
io = BytesIO()
pr.write_mime(io, skip_keys=['BadText', 'BadBin'])
io.seek(0)

msg = email.message_from_binary_file(io)
parts = [p for p in msg.walk()]
self.assertEqual(len(parts), 3)

# first part is the multipart container
self.assertTrue(parts[0].is_multipart())

# second part should be an inline text/plain attachments with all short
# fields
self.assertTrue(not parts[1].is_multipart())
self.assertEqual(parts[1].get_content_type(), 'text/plain')
self.assertEqual(parts[1].get_content_charset(), 'utf-8')
self.assertEqual(parts[1].get_filename(), None)
self.assertEqual(parts[1].get_payload(decode=True), b'''ProblemType: Crash
Date: now!
GoodText: Hi
''')

# third part should be the GoodBin: field as attachment
self.assertTrue(not parts[2].is_multipart())
f = tempfile.TemporaryFile()
f.write(parts[2].get_payload(decode=True))
f.seek(0)
self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
f.close()

def test_write_mime_order(self):
'''write_mime() with keys ordered.'''

pr = problem_report.ProblemReport(date='now!')
pr['SecondText'] = 'What'
pr['FirstText'] = 'Who'
pr['FourthText'] = 'Today'
pr['ThirdText'] = "I Don't Know"
io = BytesIO()
pr.write_mime(io, priority_fields=['FirstText', 'SecondText',
'ThirdText', 'Unknown', 'FourthText'])
io.seek(0)

msg = email.message_from_binary_file(io)
parts = [p for p in msg.walk()]
self.assertEqual(len(parts), 2)

# first part is the multipart container
self.assertTrue(parts[0].is_multipart())

# second part should be an inline text/plain attachments with all short
# fields
self.assertTrue(not parts[1].is_multipart())
self.assertEqual(parts[1].get_content_type(), 'text/plain')
self.assertEqual(parts[1].get_content_charset(), 'utf-8')
self.assertEqual(parts[1].get_filename(), None)
self.assertEqual(parts[1].get_payload(decode=True), b'''FirstText: Who
SecondText: What
ThirdText: I Don't Know
FourthText: Today
ProblemType: Crash
Date: now!
''')

def test_updating(self):
'''new_keys() and write() with only_new=True.'''

pr = problem_report.ProblemReport()
self.assertEqual(pr.new_keys(), set(['ProblemType', 'Date']))
pr.load(BytesIO(b'''ProblemType: Crash
Date: now!
Foo: bar
Baz: blob
'''))

self.assertEqual(pr.new_keys(), set())

pr['Foo'] = 'changed'
pr['NewKey'] = 'new new'
self.assertEqual(pr.new_keys(), set(['NewKey']))

out = BytesIO()
pr.write(out, only_new=True)
self.assertEqual(out.getvalue(), b'NewKey: new new\n')

def test_import_dict(self):
'''importing a dictionary with update().'''

pr = problem_report.ProblemReport()
pr['oldtext'] = 'Hello world'
pr['oldbin'] = bin_data
pr['overwrite'] = 'I am crap'

d = {}
d['newtext'] = 'Goodbye world'
d['newbin'] = '11\000\001\002\xFFZZ'
d['overwrite'] = 'I am good'

pr.update(d)
self.assertEqual(pr['oldtext'], 'Hello world')
self.assertEqual(pr['oldbin'], bin_data)
self.assertEqual(pr['newtext'], 'Goodbye world')
self.assertEqual(pr['newbin'], '11\000\001\002\xFFZZ')
self.assertEqual(pr['overwrite'], 'I am good')

if __name__ == '__main__':
unittest.main()