/usr/share/pyshared/happybase/connection.py is in python-happybase 0.8-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 | # coding: UTF-8
"""
HappyBase connection module.
"""
import logging
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport
from thrift.protocol import TBinaryProtocol
from .hbase import Hbase
from .hbase.ttypes import ColumnDescriptor
from .table import Table
from .util import pep8_to_camel_case
logger = logging.getLogger(__name__)
COMPAT_MODES = ('0.90', '0.92', '0.94', '0.96')
THRIFT_TRANSPORTS = dict(
buffered=TBufferedTransport,
framed=TFramedTransport,
)
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 9090
DEFAULT_TRANSPORT = 'buffered'
DEFAULT_COMPAT = '0.96'
class Connection(object):
"""Connection to an HBase Thrift server.
The `host` and `port` arguments specify the host name and TCP port
of the HBase Thrift server to connect to. If omitted or ``None``,
a connection to the default port on ``localhost`` is made. If
specifed, the `timeout` argument specifies the socket timeout in
milliseconds.
If `autoconnect` is `True` (the default) the connection is made
directly, otherwise :py:meth:`Connection.open` must be called
explicitly before first use.
The optional `table_prefix` and `table_prefix_separator` arguments
specify a prefix and a separator string to be prepended to all table
names, e.g. when :py:meth:`Connection.table` is invoked. For
example, if `table_prefix` is ``myproject``, all tables tables will
have names like ``myproject_XYZ``.
The optional `compat` argument sets the compatibility level for
this connection. Older HBase versions have slightly different Thrift
interfaces, and using the wrong protocol can lead to crashes caused
by communication errors, so make sure to use the correct one. This
value can be either the string ``0.90``, ``0.92``, ``0.94``, or
``0.96`` (the default).
The optional `transport` argument specifies the Thrift transport
mode to use. Supported values for this argument are ``buffered``
(the default) and ``framed``. Make sure to choose the right one,
since otherwise you might see non-obvious connection errors or
program hangs when making a connection. HBase versions before 0.94
always use the buffered transport. Starting with HBase 0.94, the
Thrift server optionally uses a framed transport, depending on the
argument passed to the ``hbase-daemon.sh start thrift`` command.
The default ``-threadpool`` mode uses the buffered transport; the
``-hsha``, ``-nonblocking``, and ``-threadedselector`` modes use the
framed transport.
.. versionadded:: 0.5
`timeout` argument
.. versionadded:: 0.4
`table_prefix_separator` argument
.. versionadded:: 0.4
support for framed Thrift transports
:param str host: The host to connect to
:param int port: The port to connect to
:param int timeout: The socket timeout in milliseconds (optional)
:param bool autoconnect: Whether the connection should be opened directly
:param str table_prefix: Prefix used to construct table names (optional)
:param str table_prefix_separator: Separator used for `table_prefix`
:param str compat: Compatibility mode (optional)
:param str transport: Thrift transport mode (optional)
"""
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, timeout=None,
autoconnect=True, table_prefix=None,
table_prefix_separator='_', compat=DEFAULT_COMPAT,
transport=DEFAULT_TRANSPORT):
if transport not in THRIFT_TRANSPORTS:
raise ValueError("'transport' must be one of %s"
% ", ".join(THRIFT_TRANSPORTS.keys()))
if table_prefix is not None \
and not isinstance(table_prefix, basestring):
raise TypeError("'table_prefix' must be a string")
if not isinstance(table_prefix_separator, basestring):
raise TypeError("'table_prefix_separator' must be a string")
if compat not in COMPAT_MODES:
raise ValueError("'compat' must be one of %s"
% ", ".join(COMPAT_MODES))
# Allow host and port to be None, which may be easier for
# applications wrapping a Connection instance.
self.host = host or DEFAULT_HOST
self.port = port or DEFAULT_PORT
self.timeout = timeout
self.table_prefix = table_prefix
self.table_prefix_separator = table_prefix_separator
self.compat = compat
self._transport_class = THRIFT_TRANSPORTS[transport]
self._refresh_thrift_client()
if autoconnect:
self.open()
self._initialized = True
def _refresh_thrift_client(self):
"""Refresh the Thrift socket, transport, and client."""
socket = TSocket(self.host, self.port)
if self.timeout is not None:
socket.setTimeout(self.timeout)
self.transport = self._transport_class(socket)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
self.client = Hbase.Client(protocol)
def _table_name(self, name):
"""Construct a table name by optionally adding a table name prefix."""
if self.table_prefix is None:
return name
return self.table_prefix + self.table_prefix_separator + name
def open(self):
"""Open the underlying transport to the HBase instance.
This method opens the underlying Thrift transport (TCP connection).
"""
if self.transport.isOpen():
return
logger.debug("Opening Thrift transport to %s:%d", self.host, self.port)
self.transport.open()
def close(self):
"""Close the underyling transport to the HBase instance.
This method closes the underlying Thrift transport (TCP connection).
"""
if not self.transport.isOpen():
return
if logger is not None:
# If called from __del__(), module variables may no longer
# exist.
logger.debug(
"Closing Thrift transport to %s:%d",
self.host, self.port)
self.transport.close()
def __del__(self):
try:
self._initialized
except AttributeError:
# Failure from constructor
return
else:
self.close()
def table(self, name, use_prefix=True):
"""Return a table object.
Returns a :py:class:`happybase.Table` instance for the table
named `name`. This does not result in a round-trip to the
server, and the table is not checked for existence.
The optional `use_prefix` argument specifies whether the table
prefix (if any) is prepended to the specified `name`. Set this
to `False` if you want to use a table that resides in another
‘prefix namespace’, e.g. a table from a ‘friendly’ application
co-hosted on the same HBase instance. See the `table_prefix`
argument to the :py:class:`Connection` constructor for more
information.
:param str name: the name of the table
:param bool use_prefix: whether to use the table prefix (if any)
:return: Table instance
:rtype: :py:class:`Table`
"""
if use_prefix:
name = self._table_name(name)
return Table(name, self)
#
# Table administration and maintenance
#
def tables(self):
"""Return a list of table names available in this HBase instance.
If a `table_prefix` was set for this :py:class:`Connection`, only
tables that have the specified prefix will be listed.
:return: The table names
:rtype: List of strings
"""
names = self.client.getTableNames()
# Filter using prefix, and strip prefix from names
if self.table_prefix is not None:
prefix = self._table_name('')
offset = len(prefix)
names = [n[offset:] for n in names if n.startswith(prefix)]
return names
def create_table(self, name, families):
"""Create a table.
:param str name: The table name
:param dict families: The name and options for each column family
The `families` argument is a dictionary mapping column family
names to a dictionary containing the options for this column
family, e.g.
::
families = {
'cf1': dict(max_versions=10),
'cf2': dict(max_versions=1, block_cache_enabled=False),
'cf3': dict(), # use defaults
}
connection.create_table('mytable', families)
These options correspond to the ColumnDescriptor structure in
the Thrift API, but note that the names should be provided in
Python style, not in camel case notation, e.g. `time_to_live`,
not `timeToLive`. The following options are supported:
* ``max_versions`` (`int`)
* ``compression`` (`str`)
* ``in_memory`` (`bool`)
* ``bloom_filter_type`` (`str`)
* ``bloom_filter_vector_size`` (`int`)
* ``bloom_filter_nb_hashes`` (`int`)
* ``block_cache_enabled`` (`bool`)
* ``time_to_live`` (`int`)
"""
name = self._table_name(name)
if not isinstance(families, dict):
raise TypeError("'families' arg must be a dictionary")
if not families:
raise ValueError(
"Cannot create table %r (no column families specified)"
% name)
column_descriptors = []
for cf_name, options in families.iteritems():
if options is None:
options = dict()
kwargs = dict()
for option_name, value in options.iteritems():
kwargs[pep8_to_camel_case(option_name)] = value
if not cf_name.endswith(':'):
cf_name += ':'
kwargs['name'] = cf_name
column_descriptors.append(ColumnDescriptor(**kwargs))
self.client.createTable(name, column_descriptors)
def delete_table(self, name, disable=False):
"""Delete the specified table.
.. versionadded:: 0.5
`disable` argument
In HBase, a table always needs to be disabled before it can be
deleted. If the `disable` argument is `True`, this method first
disables the table if it wasn't already and then deletes it.
:param str name: The table name
:param bool disable: Whether to first disable the table if needed
"""
if disable and self.is_table_enabled(name):
self.disable_table(name)
name = self._table_name(name)
self.client.deleteTable(name)
def enable_table(self, name):
"""Enable the specified table.
:param str name: The table name
"""
name = self._table_name(name)
self.client.enableTable(name)
def disable_table(self, name):
"""Disable the specified table.
:param str name: The table name
"""
name = self._table_name(name)
self.client.disableTable(name)
def is_table_enabled(self, name):
"""Return whether the specified table is enabled.
:param str name: The table name
:return: whether the table is enabled
:rtype: bool
"""
name = self._table_name(name)
return self.client.isTableEnabled(name)
def compact_table(self, name, major=False):
"""Compact the specified table.
:param str name: The table name
:param bool major: Whether to perform a major compaction.
"""
name = self._table_name(name)
if major:
self.client.majorCompact(name)
else:
self.client.compact(name)
|