/usr/lib/python3/dist-packages/S3/AccessLog.py is in s3cmd 2.0.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | # -*- coding: utf-8 -*-
## Amazon S3 - Access Control List representation
## Author: Michal Ludvig <michal@logix.cz>
## http://www.logix.cz/michal
## License: GPL Version 2
## Copyright: TGRMN Software and contributors
from __future__ import absolute_import, print_function
import sys
from . import S3Uri
from .Exceptions import ParameterError
from .Utils import getTreeFromXml, decode_from_s3
from .ACL import GranteeAnonRead
try:
import xml.etree.ElementTree as ET
except ImportError:
import elementtree.ElementTree as ET
PY3 = (sys.version_info >= (3,0))
__all__ = []
class AccessLog(object):
LOG_DISABLED = "<BucketLoggingStatus></BucketLoggingStatus>"
LOG_TEMPLATE = "<LoggingEnabled><TargetBucket></TargetBucket><TargetPrefix></TargetPrefix></LoggingEnabled>"
def __init__(self, xml = None):
if not xml:
xml = self.LOG_DISABLED
self.tree = getTreeFromXml(xml)
self.tree.attrib['xmlns'] = "http://doc.s3.amazonaws.com/2006-03-01"
def isLoggingEnabled(self):
return (self.tree.find(".//LoggingEnabled") is not None)
def disableLogging(self):
el = self.tree.find(".//LoggingEnabled")
if el:
self.tree.remove(el)
def enableLogging(self, target_prefix_uri):
el = self.tree.find(".//LoggingEnabled")
if not el:
el = getTreeFromXml(self.LOG_TEMPLATE)
self.tree.append(el)
el.find(".//TargetBucket").text = target_prefix_uri.bucket()
el.find(".//TargetPrefix").text = target_prefix_uri.object()
def targetPrefix(self):
if self.isLoggingEnabled():
target_prefix = u"s3://%s/%s" % (
self.tree.find(".//LoggingEnabled//TargetBucket").text,
self.tree.find(".//LoggingEnabled//TargetPrefix").text)
return S3Uri.S3Uri(target_prefix)
else:
return ""
def setAclPublic(self, acl_public):
le = self.tree.find(".//LoggingEnabled")
if le is None:
raise ParameterError("Logging not enabled, can't set default ACL for logs")
tg = le.find(".//TargetGrants")
if not acl_public:
if not tg:
## All good, it's not been there
return
else:
le.remove(tg)
else: # acl_public == True
anon_read = GranteeAnonRead().getElement()
if not tg:
tg = ET.SubElement(le, "TargetGrants")
## What if TargetGrants already exists? We should check if
## AnonRead is there before appending a new one. Later...
tg.append(anon_read)
def isAclPublic(self):
raise NotImplementedError()
def __unicode__(self):
return decode_from_s3(ET.tostring(self.tree))
def __str__(self):
if PY3:
# Return unicode
return ET.tostring(self.tree, encoding="unicode")
else:
# Return bytes
return ET.tostring(self.tree)
__all__.append("AccessLog")
if __name__ == "__main__":
log = AccessLog()
print(log)
log.enableLogging(S3Uri.S3Uri(u"s3://targetbucket/prefix/log-"))
print(log)
log.setAclPublic(True)
print(log)
log.setAclPublic(False)
print(log)
log.disableLogging()
print(log)
# vim:et:ts=4:sts=4:ai
|