class Irc::Bot::Journal::Storage::PostgresStorage

Attributes

conn[R]

Public Class Methods

new(opts={}) click to toggle source
# File lib/rbot/journal/postgres.rb, line 95
def initialize(opts={})
  @uri = opts[:uri] || 'postgresql://localhost/rbot'
  @conn = PGWrapper.new(@uri)
  @conn.exec('set client_min_messages = warning')
  @conn.exec(CREATE_INDEX)
  @version = @conn.exec('SHOW server_version;')[0]['server_version']

  @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
  log 'journal storage: postgresql connected to version: ' + @version
  
  version = @version.split('.')[0,3].join.to_i
  if version < 930
    raise StorageError.new(
      'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
  end
  @jsonb = (version >= 940)
  log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
  log 'journal storage: postgres backend is using JSONB :)' if @jsonb

  drop if opts[:drop]
  create_table
  create_index('topic_index', 'topic')
  create_index('timestamp_index', 'timestamp')
end

Public Instance Methods

count(query=nil) click to toggle source

returns the number of messages that match the query

# File lib/rbot/journal/postgres.rb, line 174
def count(query=nil)
  if query
    sql, params = query_to_sql(query)
    sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
  else
    sql = 'SELECT COUNT(*) FROM journal'
    params = []
  end
  res = @conn.exec_params(sql, params)
  res[0]['count'].to_i
end
create_index(index_name, column_name) click to toggle source
# File lib/rbot/journal/postgres.rb, line 129
def create_index(index_name, column_name)
  debug 'journal postges backend: create index %s for %s' % [
    index_name, column_name]
  @conn.exec_params('SELECT create_index($1, $2, $3)', [
    'journal', index_name, column_name])
end
create_payload_index(key) click to toggle source
# File lib/rbot/journal/postgres.rb, line 136
def create_payload_index(key)
  index_name = 'idx_payload_' + key.gsub('.', '_')
  column = sql_payload_selector(key)
  create_index(index_name, column)
end
create_table() click to toggle source
# File lib/rbot/journal/postgres.rb, line 120
def create_table
  @conn.exec('
    CREATE TABLE IF NOT EXISTS journal
      (id UUID PRIMARY KEY,
       topic TEXT NOT NULL,
       timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
       payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
end
drop() click to toggle source
# File lib/rbot/journal/postgres.rb, line 197
def drop
  @conn.exec('DROP TABLE journal;') rescue nil
end
ensure_index(key) click to toggle source
# File lib/rbot/journal/postgres.rb, line 142
def ensure_index(key)
  create_payload_index(key)
end
find(query=nil, limit=100, offset=0, &block) click to toggle source
# File lib/rbot/journal/postgres.rb, line 151
def find(query=nil, limit=100, offset=0, &block)
  def to_message(row)
    timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
    JournalMessage.new(id: row['id'], timestamp: timestamp,
      topic: row['topic'], payload: JSON.parse(row['payload']))
  end

  if query
    sql, params = query_to_sql(query)
    sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
  else
    sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
    params = []
  end
  res = @conn.exec_params(sql, params)
  if block_given?
    res.each { |row| block.call(to_message(row)) }
  else
    res.map { |row| to_message(row) }
  end
end
insert(m) click to toggle source
# File lib/rbot/journal/postgres.rb, line 146
def insert(m)
  @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
    [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
end
query_to_sql(query) click to toggle source
# File lib/rbot/journal/postgres.rb, line 214
def query_to_sql(query)
  params = []
  placeholder = Proc.new do |value|
    params << value
    '$%d' % [params.length]
  end
  sql = {op: 'AND', list: []}

  # ID query OR condition
  unless query.id.empty?
    sql[:list] << {
      op: 'OR',
      list: query.id.map { |id| 
        'id = ' + placeholder.call(id)
      }
    }
  end

  # Topic query OR condition
  unless query.topic.empty?
    sql[:list] << {
      op: 'OR',
      list: query.topic.map { |topic| 
        'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
      }
    }
  end

  # Timestamp range query AND condition
  if query.timestamp[:from] or query.timestamp[:to]
    list = []
    if query.timestamp[:from]
      list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
    end
    if query.timestamp[:to]
      list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
    end
    sql[:list] << {
      op: 'AND',
      list: list
    }
  end

  # Payload query
  unless query.payload.empty?
    list = []
    query.payload.each_pair do |key, value|
      selector = sql_payload_selector(key)
      list << selector + ' = ' + placeholder.call(value)
    end
    sql[:list] << {
      op: 'OR',
      list: list
    }
  end

  sql = sql[:list].map { |stmt|
    '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
  }.join(' %s ' % [sql[:op]])

  [sql, params]
end
remove(query=nil) click to toggle source
# File lib/rbot/journal/postgres.rb, line 186
def remove(query=nil)
  if query
    sql, params = query_to_sql(query)
    sql = 'DELETE FROM journal WHERE ' + sql
  else
    sql = 'DELETE FROM journal;'
    params = []
  end
  res = @conn.exec_params(sql, params)
end
sql_payload_selector(key) click to toggle source
# File lib/rbot/journal/postgres.rb, line 201
def sql_payload_selector(key)
  selector = 'payload'
  k = key.to_s.split('.')
  k.each_index { |i|
    if i >= k.length-1
      selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
    else
      selector += '->\'%s\'' % [@conn.escape_string(k[i])]
    end
  }
  selector
end
to_message(row) click to toggle source
# File lib/rbot/journal/postgres.rb, line 152
def to_message(row)
  timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
  JournalMessage.new(id: row['id'], timestamp: timestamp,
    topic: row['topic'], payload: JSON.parse(row['payload']))
end