diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..38fd6a7 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,19 @@ +version: 2.1 + +orbs: + ruby-orbs: sue445/ruby-orbs@volatile + +jobs: + test: + docker: + - image: cimg/ruby:3.0 + steps: + - checkout + - ruby-orbs/bundle-install: + gemspec_name: fluent-plugin-time-series-counter + with_gemfile_lock: false + - run: bundle exec rake test +workflows: + test: + jobs: + - test diff --git a/Rakefile b/Rakefile index 809eb56..97055e7 100644 --- a/Rakefile +++ b/Rakefile @@ -1,2 +1,11 @@ require "bundler/gem_tasks" +require 'rake/testtask' +Rake::Task[:release].clear + +Rake::TestTask.new(:test) do |t| + t.libs << 'lib' << 'test' + t.pattern = 'test/**/test_*.rb' +end + +task default: :test diff --git a/fluent-plugin-time-series-counter.gemspec b/fluent-plugin-time-series-counter.gemspec index 342d85b..76b975d 100644 --- a/fluent-plugin-time-series-counter.gemspec +++ b/fluent-plugin-time-series-counter.gemspec @@ -16,6 +16,8 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] - spec.add_development_dependency "bundler", "~> 1.7" - spec.add_development_dependency "rake", "~> 10.0" + spec.add_dependency 'fluentd', '>= 1', '< 2' + + spec.add_development_dependency 'rake' + spec.add_development_dependency 'test-unit' end diff --git a/lib/fluent/plugin/out_time_series_counter.rb b/lib/fluent/plugin/out_time_series_counter.rb index 62daae1..6193978 100644 --- a/lib/fluent/plugin/out_time_series_counter.rb +++ b/lib/fluent/plugin/out_time_series_counter.rb @@ -1,113 +1,102 @@ -module Fluent - class TimeSeriesCounter < Fluent::BufferedOutput - Fluent::Plugin.register_output('time_series_counter', self) - - unless method_defined?(:log) - define_method('log') { $log } +require 'fluent/plugin/output' + +class Fluent::Plugin::TimeSeriesCounter < Fluent::Plugin::Output + Fluent::Plugin.register_output('time_series_counter', self) + helpers :event_emitter + + config_param :tag, :string, default: "tsc" + config_param :count_keys, :array, value_type: :string, alias: :count_key + config_param :count_key_delimiter, :string, default: ":" + config_param :count_value_name, :string, default: "count" + config_param :unit, :array, value_type: :string + config_param :uniq_key, :string, default: "tsc_key" + config_param :unit_key, :string, default: "tsc_unit" + config_param :time_key, :string, default: "tsc_time" + + def configure(conf) + super + + @units = unit.inject({}) do |hash, i| + hash[i] = true + hash end + end - config_param :tag, :string, :default => "tsc" - config_param :count_key, :string, :default => nil - config_param :count_key_delimiter, :string, :default => ":" - config_param :count_value_name, :string, :default => "count" - config_param :unit, :string, :default => nil - config_param :uniq_key, :string, :default => "tsc_key" - config_param :unit_key, :string, :default => "tsc_unit" - config_param :time_key, :string, :default => "tsc_time" - - def initialize - super - end + def formatted_to_msgpack_binary? + true + end - def configure(conf) - super - if !count_key - raise ConfigError, "out_time_series_counter: required 'count_key' parameter." - end + def format(tag, time, record) + [tag, time, record].to_msgpack + end - if !unit - raise ConfigError, "out_time_series_counter: required 'unit' parameter." + def write(chunk) + stats = {} + + chunk.msgpack_each do |tag, time, record| + skip = false + + @count_keys.each do |k| + # skip record if a record does not have requried count_keys + skip = true unless record[k] end + next if skip - @count_keys = count_key.split(/\s*,\s*/).sort - @unit = unit.split(/\s*,\s*/).inject({}) do |hash, i| - hash[i] = true - hash + if @units['min'] + count(stats, record, time, "min") end - end - def format(tag, time, record) - [tag, time, record].to_msgpack - end + if @units['hour'] + count(stats, record, time, "hour") + end - def write(chunk) - stats = {} - chunk.msgpack_each do |tag, time, record| - skip = false - next unless time - @count_keys.each do |k| - # skip record if a record does not have requried count_keys - skip = true unless record[k] - end - next if skip - - if @unit['min'] - count(stats, record, time, "min") - end - - if @unit['hour'] - count(stats, record, time, "hour") - end - - if @unit['day'] - count(stats, record, time, "day") - end + if @units['day'] + count(stats, record, time, "day") end + end + output_stats(stats) + end - output_stats(stats) + private + def create_uniq_key(record, unit, time) + uniq_key = [] + @count_keys.each do |k| + uniq_key << record[k] end + uniq_key << time.to_s + uniq_key << unit + uniq_key.join(@count_key_delimiter) + end - private - def create_uniq_key(record, unit, time) - uniq_key = [] - @count_keys.each do |k| - uniq_key << record[k] - end - uniq_key << time.to_s - uniq_key << unit - uniq_key.join(@count_key_delimiter) + def count(stats, record, time, unit) + case unit + when "min" + unit_time = time - (time % 60) + when "hour" + unit_time = time - (time % 3600) + when "day" + unit_time = time - (time % 86400) + else + return end - def count(stats, record, time, unit) - unix_time = 0 - case unit - when "min" - unit_time = time - (time % 60) - when "hour" - unit_time = time - (time % 3600) - when "day" - unit_time = time - (time % 86400) - else - return - end - tsc_key = create_uniq_key(record, unit, unit_time) - unless stats[tsc_key] - stats[tsc_key] = {@count_value_name => 0} unless stats[tsc_key] - @count_keys.each do |k| - stats[tsc_key][k] = record[k] - end - stats[tsc_key][@unit_key] = unit - stats[tsc_key][@time_key] = unit_time + tsc_key = create_uniq_key(record, unit, unit_time) + unless stats[tsc_key] + stats[tsc_key] = {@count_value_name => 0} unless stats[tsc_key] + @count_keys.each do |k| + stats[tsc_key][k] = record[k] end - stats[tsc_key][@count_value_name] += 1 + stats[tsc_key][@unit_key] = unit + stats[tsc_key][@time_key] = unit_time end + stats[tsc_key][@count_value_name] += 1 + end - def output_stats(stats) - stats.each do |k, v| - v[@uniq_key] = k - Fluent::Engine.emit("#{@tag}", Fluent::Engine.now, v) - end + def output_stats(stats) + stats.each do |k, v| + v[@uniq_key] = k + router.emit(@tag, Fluent::Engine.now, v) end end end diff --git a/test/helper.rb b/test/helper.rb new file mode 100644 index 0000000..f9e989a --- /dev/null +++ b/test/helper.rb @@ -0,0 +1,8 @@ +$LOAD_PATH.unshift(File.expand_path("../../", __FILE__)) +require "test-unit" +require "fluent/test" +require "fluent/test/driver/output" +require "fluent/test/helpers" + +Test::Unit::TestCase.include(Fluent::Test::Helpers) +Test::Unit::TestCase.extend(Fluent::Test::Helpers) diff --git a/test/plugin/test_out_time_series_counter.rb b/test/plugin/test_out_time_series_counter.rb new file mode 100644 index 0000000..8f59644 --- /dev/null +++ b/test/plugin/test_out_time_series_counter.rb @@ -0,0 +1,151 @@ +require 'helper' + +require 'fluent/plugin/out_time_series_counter' + +class TimeSeriesCounterTest < Test::Unit::TestCase + include Fluent::Test::Helpers + + def setup + Fluent::Test.setup # Setup test for Fluentd (Required) + end + + CONFIG = %[ + count_key id + unit hour + ] + + def create_driver(conf = CONFIG) + Fluent::Test::Driver::Output.new(Fluent::Plugin::TimeSeriesCounter).configure(conf) + end + + sub_test_case 'configuration' do + test 'basic configuration' do + d = create_driver + assert_equal ['id'], d.instance.count_keys + assert_equal ['hour'], d.instance.unit + end + + test 'invalid configration' do + assert_raise(Fluent::ConfigError) { + create_driver( + <<~EOS + count_key id + EOS + ) + } + + assert_raise(Fluent::ConfigError) { + create_driver( + <<~EOS + unit hour + EOS + ) + } + end + end + + sub_test_case 'count' do + test 'count per count_key' do + d = create_driver + d.run(default_tag: 'test') do + d.feed({'id' => 1}) + d.feed({'id' => 1}) + d.feed({'id' => 2}) + end + + # result == uniq(count_key) + assert_equal(2, d.events.size) + + results = d.events.flat_map{|_key, _time, result| result } + + result = results.find{ |r| r["id"] == 1 } + assert_equal(2, result["count"]) + + result = results.find{ |r| r["id"] == 2 } + assert_equal(1, result["count"]) + end + + test 'count per unit(hour)' do + d = create_driver + + d.run(default_tag: 'test') do + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 1}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 1}) + + d.feed(event_time('2022-09-01 11:00:00 UTC'), {'id' => 1}) + end + + # result == uniq(count_key) + assert_equal(2, d.events.size) + + results = d.events.flat_map{|_key, _time, result| result } + + # Time.at(1662026400) => 2022-09-01 10:00:00 UTC + result = results.find{ |r| r["tsc_time"] == 1662026400 } + assert_equal(2, result["count"]) + + # Time.at(1662030000) => 2022-09-01 11:00:00 UTC + result = results.find{ |r| r["tsc_time"] == 1662030000 } + assert_equal(1, result["count"]) + end + + test 'count only feeds with all keys' do + d = create_driver( + <<~EOS + count_key a,b,c + unit hour + EOS + ) + + d.run(default_tag: 'test') do + # no count + d.feed({a: 1}) + d.feed({a: 1, b: 2}) + d.feed({a: 1, b: 2}) + + # count + d.feed({a: 1, b: 2, c: 3}) + d.feed({a: 1, b: 2, c: 3}) + d.feed({a: 1, b: 2, c: 3}) + end + + assert_equal(1, d.events.size) + + _key, _time, record = d.events.first + + assert_equal(3, record['count']) + end + + test 'count per units' do + d = create_driver( + <<~EOS + count_key id + unit min,hour,day + EOS + ) + + d.run(default_tag: 'test') do + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 1}) + d.feed(event_time('2022-09-01 10:01:00 UTC'), {'id' => 1}) + + d.feed(event_time('2022-09-01 11:00:00 UTC'), {'id' => 1}) + + d.feed(event_time('2022-09-02 10:00:00 UTC'), {'id' => 1}) + d.feed(event_time('2022-09-02 10:00:00 UTC'), {'id' => 1}) + d.feed(event_time('2022-09-02 10:00:00 UTC'), {'id' => 1}) + + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 2}) + end + + results = d.events.map{|_,_,r| r} + + assert_equal(4, results.count{|r| r['id'] == 1 && r['tsc_unit'] == 'min'}) + assert_equal(3, results.count{|r| r['id'] == 1 && r['tsc_unit'] == 'hour'}) + assert_equal(2, results.count{|r| r['id'] == 1 && r['tsc_unit'] == 'day'}) + + assert_equal(1, results.count{|r| r['id'] == 2 && r['tsc_unit'] == 'min'}) + assert_equal(1, results.count{|r| r['id'] == 2 && r['tsc_unit'] == 'hour'}) + assert_equal(1, results.count{|r| r['id'] == 2 && r['tsc_unit'] == 'day'}) + end + end +end