Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Doc/library/tarfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ Some facts and figures:
a Zstandard dictionary used to improve compression of smaller amounts of
data.

For modes ``'w:gz'`` and ``'w|gz'``, :func:`tarfile.open` accepts the
keyword argument *mtime* to create a gzip archive header with that mtime. By
default, the mtime is set to the time of creation of the archive.

For special purposes, there is a second format for *mode*:
``'filemode|[compression]'``. :func:`tarfile.open` will return a :class:`TarFile`
object that processes its data as a stream of blocks. No random seeking will
Expand Down
18 changes: 11 additions & 7 deletions Lib/tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ class _Stream:
"""

def __init__(self, name, mode, comptype, fileobj, bufsize,
compresslevel, preset):
compresslevel, preset, mtime):
"""Construct a _Stream object.
"""
self._extfileobj = True
Expand Down Expand Up @@ -374,7 +374,7 @@ def __init__(self, name, mode, comptype, fileobj, bufsize,
self.exception = zlib.error
self._init_read_gz()
else:
self._init_write_gz(compresslevel)
self._init_write_gz(compresslevel, mtime)

elif comptype == "bz2":
try:
Expand Down Expand Up @@ -423,15 +423,17 @@ def __del__(self):
if hasattr(self, "closed") and not self.closed:
self.close()

def _init_write_gz(self, compresslevel):
def _init_write_gz(self, compresslevel, mtime):
"""Initialize for writing with gzip compression.
"""
self.cmp = self.zlib.compressobj(compresslevel,
self.zlib.DEFLATED,
-self.zlib.MAX_WBITS,
self.zlib.DEF_MEM_LEVEL,
0)
timestamp = struct.pack("<L", int(time.time()))
if mtime is None:
mtime = int(time.time())
timestamp = struct.pack("<L", mtime)
self.__write(b"\037\213\010\010" + timestamp + b"\002\377")
if self.name.endswith(".gz"):
self.name = self.name[:-3]
Expand Down Expand Up @@ -1726,7 +1728,7 @@ class TarFile(object):
def __init__(self, name=None, mode="r", fileobj=None, format=None,
tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
errors="surrogateescape", pax_headers=None, debug=None,
errorlevel=None, copybufsize=None, stream=False):
errorlevel=None, copybufsize=None, stream=False, mtime=None):
"""Open an (uncompressed) tar archive 'name'. 'mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
file or 'w' to create a new file overwriting an existing one. 'mode'
Expand Down Expand Up @@ -1932,8 +1934,9 @@ def not_compressed(comptype):

compresslevel = kwargs.pop("compresslevel", 6)
preset = kwargs.pop("preset", None)
mtime = kwargs.pop("mtime", None)
stream = _Stream(name, filemode, comptype, fileobj, bufsize,
compresslevel, preset)
compresslevel, preset, mtime)
try:
t = cls(name, filemode, stream, **kwargs)
except:
Expand Down Expand Up @@ -1969,7 +1972,8 @@ def gzopen(cls, name, mode="r", fileobj=None, compresslevel=6, **kwargs):
raise CompressionError("gzip module is not available") from None

try:
fileobj = GzipFile(name, mode + "b", compresslevel, fileobj)
mtime = kwargs.pop("mtime", None)
fileobj = GzipFile(name, mode + "b", compresslevel, fileobj, mtime=mtime)
except OSError as e:
if fileobj is not None and mode == 'r':
raise ReadError("not a gzip file") from e
Expand Down
35 changes: 35 additions & 0 deletions Lib/test/test_tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import warnings
import stat
import time

import unittest
import unittest.mock
Expand Down Expand Up @@ -1809,6 +1810,23 @@ def test_source_directory_not_leaked(self):
payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
assert os.path.dirname(tmpname) not in payload

def test_create_with_mtime(self):
tarfile.open(tmpname, self.mode, mtime=0).close()
with self.open(tmpname, 'r') as fobj:
fobj.read()
self.assertEqual(fobj.mtime, 0)

def test_create_without_mtime(self):
timestamp = 12345
orig_time = time.time
try:
time.time = lambda: timestamp
tarfile.open(tmpname, self.mode).close()
finally:
time.time = orig_time
with self.open(tmpname, 'r') as fobj:
fobj.read()
self.assertEqual(fobj.mtime, timestamp)

class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
decompressor = bz2.BZ2Decompressor if bz2 else None
Expand Down Expand Up @@ -2115,6 +2133,23 @@ def test_create_with_compresslevel(self):
with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
pass

def test_create_with_mtime(self):
tarfile.open(tmpname, self.mode, mtime=0).close()
with self.open(tmpname, 'rb') as fobj:
fobj.read()
self.assertEqual(fobj.mtime, 0)

def test_create_without_mtime(self):
timestamp = 56789
orig_time = time.time
try:
time.time = lambda: timestamp
tarfile.open(tmpname, self.mode).close()
finally:
time.time = orig_time
with self.open(tmpname, 'r') as fobj:
fobj.read()
self.assertEqual(fobj.mtime, timestamp)

class Bz2CreateTest(Bz2Test, CreateTest):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add optional ``mtime`` argument to :func:`tarfile.open`, for setting the ``mtime`` header field in ``.tar.gz`` archives.
Loading