require './test.rb'

module TestConcurrency

class TestObject
  attr_accessor :x, :last_writer, :last_write_transaction
  def initialize(max_size = nil)
    if max_size
      @random_stuff = (0...rand(max_size)).map {|i| "#{i}"}
        # so the object has variable size
    end
    @x = 0 # seems to be stored after the first attr, so its position changes
           # this is almost certainly dependent on the hashing alg.
  end
  def inspect
    "<TestObject:@x = #{@x}, @last_writer = #{@last_writer}," +
    " @last_write_transaction = #{@last_write_transaction}>"
  end
  def to_s
    inspect
  end
end

def self.run_thread(db, thread, paths, rep_count, object_size, max_sleep)

  rep_count.times do |iter|
    thread[:iter] = iter
    path = paths[rand(paths.size)]

    case rand(100)

    when 0..49
      db.browse path do |tester|
        if $test
          __x__ = tester.x

          sleep(rand*max_sleep) if max_sleep

          unless __x__ == tester.x
            fail "browse test: x == #{__x__} but tester.x == #{tester.x}"
          end
        end

        thread[:transactions] += 1
      end

    when 50..69
      db.edit path do |tester|
        if $test
          tester.x += 1
          __x__ = tester.x

          sleep(rand*max_sleep) if max_sleep

          unless __x__ == tester.x
            fail "edit test: x == #{__x__} but tester.x == #{tester.x}"
          end

          tester.last_write_transaction = :EDIT
          tester.last_writer = "#{Process.pid}, #{thread[:number]}"
        else
          tester.x += 1
        end

        thread[:increments][path] += 1
        thread[:transactions] += 1
      end

    when 70..90
      db.replace path do |tester|
        if $test
          tester.x += 1
          __x__ = tester.x

          sleep(rand*max_sleep) if max_sleep

          unless __x__ == tester.x
            fail "replace test: x == #{__x__} but tester.x == #{tester.x}"
          end

          # this is what replace lets us do:
          old_tester = tester
          tester = TestObject.new(object_size)
          tester.x = old_tester.x
          old_tester.last_write_transaction = :GARBAGE

          tester.last_write_transaction = :REPLACE
          tester.last_writer = "#{Process.pid}, #{thread[:number]}"
        else
          tester.x += 1
        end

        thread[:increments][path] += 1
        thread[:transactions] += 1

        tester # replace needs to know the new object
      end

    else
      if $test
        sleep(rand*max_sleep) if max_sleep
        db.clear_cache
      end

    end

  end
end

def self.go(db,
            process_count =    10,
            thread_count  =    10,
            rep_count     =  1000,
            object_count  =    10,
            object_size   =    $test ? 10 : nil,
            max_sleep     =   nil)

  srand(Process.pid)

  @test_dir = "TestConcurrency#{Process.pid}/"

  paths = (0...object_count).map {|i| "#{@test_dir}/TestObjects/#{i}"}

  paths.each do |path|
    db[path] = TestObject.new
  end

  processes = (0...process_count).map do |process_idx|
    fork do
      profile if $profile and process_idx == 0

      srand(Process.pid)

      start_time = Process.times
      threads = (0...thread_count).map do |thread_index|
        Thread.new(thread_index) do |ti|
          thread = Thread.current
          thread[:number] = ti
          thread[:transactions] = 0
          thread[:increments] = Hash.new(0)

          begin
            run_thread(db, thread, paths, rep_count, object_size,
                       max_sleep && max_sleep/1000.0)
          rescue Exception
            last_msgs 50
            $stderr.puts "*** Process #{Process.pid}, thread #{ti}:"
            raise
          end
        end
      end

      # could start up a monitor thread here to periodically
      # update the results in the database

      threads.each { |thread| thread.join }
      finish_time = Process.times

      utime = finish_time.utime - start_time.utime
      stime = finish_time.stime - start_time.stime
      total_transactions = threads.inject(0) do |sum, thread|
        sum += thread[:transactions]
      end
      rate = total_transactions / (utime + stime)

      total_increments = Hash.new(0)
      threads.each do |thread|
        thread[:increments].each do |path, inc|
          total_increments[path] += inc
        end
      end

      db["#{@test_dir}/Results/#{Process.pid}"] = {
        :utime => utime,
        :stime => stime,
        :transactions => total_transactions,
        :rate  => rate,
        :increments => total_increments
      }
    end
  end

  monitor_thread = Thread.new do
    i = 0
    loop do
      puts i
      sleep 1
      i += 1
    end
  end

  results = (0...process_count).map do
    pid = Process.wait
    db["#{@test_dir}/Results/#{pid}"] or raise "Empty result"
  end

  puts "Runs finished: #{process_count} processes X #{thread_count} threads" +
       " X #{rep_count} steps on #{object_count} objects "

  total_increments = Hash.new(0)
  results.each do |result|
    result[:increments].each do |path, inc|
      total_increments[path] += inc
    end
  end

  total_increments.each do |path, inc|
    unless db[path].x == inc
      raise "Data inconsistency: For path #{path},\n" +
            "file has #{db[path].x} but there were #{inc} increments"
    end
  end

  total_transactions = results.inject(0) do |sum, result|
    sum += result[:transactions]
  end

  total_utime = results.inject(0) do |sum, result|
    sum += result[:utime]
  end

  total_stime = results.inject(0) do |sum, result|
    sum += result[:stime]
  end

  total_time = total_utime + total_stime

  s_pct = 100 * total_stime / total_time

  printf "%20s: %10d\n", "Transactions", total_transactions
  printf "%20s: %13.2f sec (%.1f%% system)\n", "Time", total_time, s_pct
  printf "%20s: %13.2f tr/sec\n", "Rate", total_transactions/total_time

  monitor_thread.kill

  cleanup db, @test_dir
  db.delete @test_dir

end

def self.cleanup db, dir
  db.browse_dir dir do |child_path|
    if child_path[-1] == ?/
      cleanup(db, child_path)
    end
    db.delete(child_path)
  end
end

end

if __FILE__ == $0

# args:
#  process_count thread_count rep_count
#  object_count object_size max_sleep(millisec)

$bench = ARGV.delete("-b")
$profile = ARGV.delete("-p")
$test = !$bench && !$profile

if $profile
  require 'profile'
end

TestConcurrency.go($db, *ARGV.map{|x|x.to_i})

end