/usr/lib/ruby/vendor_ruby/sequel/extensions/pg_streaming.rb is in ruby-sequel-pg 1.6.10-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | unless Sequel::Postgres.respond_to?(:supports_streaming?)
raise LoadError, "either sequel_pg not loaded, or an old version of sequel_pg loaded"
end
unless Sequel::Postgres.supports_streaming?
raise LoadError, "streaming is not supported by the version of libpq in use"
end
# Database methods necessary to support streaming. You should load this extension
# into your database object:
#
# DB.extension(:pg_streaming)
#
# Then you can call #stream on your datasets to use the streaming support:
#
# DB[:table].stream.each{|row| ...}
#
# Or change a set so that all dataset calls use streaming:
#
# DB.stream_all_queries = true
module Sequel::Postgres::Streaming
attr_accessor :stream_all_queries
# Also extend the database's datasets to support streaming.
# This extension requires modifying connections, so disconnect
# so that new connections will get the methods.
def self.extended(db)
db.extend_datasets(DatasetMethods)
db.stream_all_queries = false
db.disconnect
end
# Make sure all new connections have the appropriate methods added.
def connect(server)
conn = super
conn.extend(AdapterMethods)
conn
end
private
# If streaming is requested, and a prepared statement is not
# used, tell the connection to use single row mode for the query.
def _execute(conn, sql, opts={}, &block)
if opts[:stream] && !sql.is_a?(Symbol)
conn.single_row_mode = true
end
super
end
# If streaming is requested, send the prepared statement instead
# of executing it and blocking.
def _execute_prepared_statement(conn, ps_name, args, opts)
if opts[:stream]
conn.send_prepared_statement(ps_name, args)
else
super
end
end
module AdapterMethods
# Whether the next query on this connection should use
# single_row_mode.
attr_accessor :single_row_mode
# Send the prepared statement on this connection using
# single row mode.
def send_prepared_statement(ps_name, args)
send_query_prepared(ps_name, args)
set_single_row_mode
block
self
end
private
# If using single row mode, send the query instead of executing it.
def execute_query(sql, args)
if @single_row_mode
@single_row_mode = false
@db.log_yield(sql, args){args ? send_query(sql, args) : send_query(sql)}
set_single_row_mode
block
self
else
super
end
end
end
# Dataset methods used to implement streaming.
module DatasetMethods
# If streaming has been requested and the current dataset
# can be streamed, request the database use streaming when
# executing this query, and use yield_each_row to process
# the separate PGresult for each row in the connection.
def fetch_rows(sql)
if stream_results?
execute(sql, :stream=>true) do |conn|
yield_each_row(conn){|h| yield h}
end
else
super
end
end
# Use streaming to implement paging.
def paged_each(opts=OPTS, &block)
stream.each(&block)
end
# Return a clone of the dataset that will use streaming to load
# rows.
def stream
clone(:stream=>true)
end
private
# Only stream results if streaming has been specifically requested
# and the query is streamable.
def stream_results?
(@opts[:stream] || db.stream_all_queries) && streamable?
end
# Queries using cursors are not streamable, and queries that use
# the map/select_map/to_hash/to_hash_groups optimizations are not
# streamable, but other queries are streamable.
def streamable?
spgt = (o = @opts)[:_sequel_pg_type]
(spgt.nil? || spgt == :model) && !o[:cursor]
end
end
end
Sequel::Database.register_extension(:pg_streaming, Sequel::Postgres::Streaming)
|