/usr/lib/python3/dist-packages/asyncssh/compression.py is in python3-asyncssh 1.3.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | # Copyright (c) 2013-2015 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""SSH compression handlers"""
import zlib
_cmp_algs = []
_cmp_params = {}
_cmp_compressors = {}
_cmp_decompressors = {}
def _none():
"""Compressor/decompressor for no compression."""
return None
class _ZLibCompress:
"""Wrapper class to force a sync flush when compressing"""
def __init__(self):
self._comp = zlib.compressobj()
def compress(self, data):
"""Compress data using zlib compression with sync flush"""
return self._comp.compress(data) + self._comp.flush(zlib.Z_SYNC_FLUSH)
def register_compression_alg(alg, compressor, decompressor, after_auth):
"""Register a compression algorithm"""
_cmp_algs.append(alg)
_cmp_params[alg] = after_auth
_cmp_compressors[alg] = compressor
_cmp_decompressors[alg] = decompressor
def get_compression_algs():
"""Return a list of available compression algorithms"""
return _cmp_algs
def get_compression_params(alg):
"""Get parameters of a compression algorithm
This function returns whether or not a compression algorithm should
be delayed until after authentication completes.
"""
return _cmp_params[alg]
def get_compressor(alg):
"""Return an instance of a compressor
This function returns an object that can be used for data compression.
"""
return _cmp_compressors[alg]()
def get_decompressor(alg):
"""Return an instance of a decompressor
This function returns an object that can be used for data decompression.
"""
return _cmp_decompressors[alg]()
# pylint: disable=bad-whitespace
register_compression_alg(b'zlib@openssh.com',
_ZLibCompress, zlib.decompressobj, True)
register_compression_alg(b'zlib',
_ZLibCompress, zlib.decompressobj, False)
register_compression_alg(b'none',
_none, _none, False)
|