/usr/share/pyshared/musiclibrarian/cache.py is in musiclibrarian 1.6-2.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | # cache.py
#
# Copyright 2004 Daniel Burrows
#
# This code maintains a cache of file metadata that is used to speed the
# program startup.
import cPickle
import gzip
import os
import serialize
import stat
import sys
def fingerprint(st):
"""Return a 'fingerprint' based on the given stat info."""
return st[stat.ST_MTIME],st[stat.ST_CTIME],st[stat.ST_SIZE]
# Cache readers. The readers support various cache formats (see
# below); the Cache class only produces the most "recent" format.
def read_pickled_cache(file):
"""Reads a cache stored using pickle from the given file."""
return cPickle.load(file)
cache_protocols={
'pickle' : read_pickled_cache
}
class Cache:
"""Encapsulates the on-disk cache of file metadata. Files are
indexed by name, and stat fingerprinting is used to detect changes
to a file.
Note that it is never *required* to modify the cache as files are
changed (assuming that the (mtime,ctime,size) triple is enough to
detect changes to a file); the cache is only used when loading a
new file, and outdated entries will be corrected. However, it
*is* recommended that you update the cache via Cache.put after
writing tags back to a file, in order to avoid unnecessary
recomputation of cache entries."""
def __init__(self, fn):
"""Load the cache from the given file."""
self.__dirty=False
self.fn=fn
try:
isGZ=False
if os.path.exists('%s.gz'%fn):
try:
f=gzip.GzipFile('%s.gz'%fn, 'r')
isGZ=True
# Remove any uncompressed cache.
os.unlink(fn)
except:
pass
if not isGZ:
f=open(fn, 'r')
protocol,format=map(lambda x:x.strip(),
f.readline().split(','))
if format <> '0':
raise 'Unsupported cache format %s'%format
self.__cache=cache_protocols[protocol](f)
except IOError:
# If any error occurs, bail.
self.__cache={}
except:
# Unexpected errors get printed.
apply(sys.excepthook, sys.exc_info())
self.__cache={}
def get(self, fn, st=None):
"""Return the cache entry for the given filename. st should be
the stat information for the corresponding file; if it is not supplied,
it will be determined via the stat() system call.
If the file is not cached, or if its cached data is no longer valid
(due to a changed fingerprint), this method will return None."""
try:
if st == None:
st=os.stat(fn)
info=self.__cache.get(fn, None)
if info==None:
return None
oldfp,cached=info
if fingerprint(st) <> oldfp:
return None
else:
return cached
except:
apply(sys.excepthook, sys.exc_info())
return None
def put(self, file, st=None):
"""Replace the entry for the selected file. st should be the
file's stat information; if it is not supplied, it will be
determined using the stat() system call.
If the file's new information is different from its old
information, the cache will be dirtied."""
if st == None:
st=os.stat(file.fn)
new_cache_entry=fingerprint(st),file.get_cache()
if new_cache_entry <> self.__cache.get(file.fn, None):
self.__dirty=True
self.__cache[file.fn]=new_cache_entry
def write(self):
"""Unconditionally write the cache to disk."""
dn=os.path.dirname(self.fn)
if not os.path.isdir(dn):
os.makedirs(dn)
f=gzip.GzipFile('%s.gz'%self.fn, 'w')
f.write('pickle,0\n')
cPickle.dump(self.__cache, f, cPickle.HIGHEST_PROTOCOL)
# Zap any old uncompressed data.
try:
os.unlink(self.fn)
except OSError:
pass
self.__dirty=False
def flush(self):
"""Write the cache to disk iff it has not been updated."""
if self.__dirty:
self.write()
|