プログラムを高速化した話


2023年 06月 13日

何万回,何億回と繰り返されるループ内で実行されるプログラムは,プログラム全体の実行速度に大きな影響を与えます.ループ内プログラムの高速化は,プログラムが遅いと思った時の有効な手段です.
この記事では,ループを何億回と繰り返すプログラムを高速化した話を紹介します.

問題

あるItem(商品)をどのようなGroup(倉庫/工場/広告媒体など)に振り分けるのが良いかを考えます.
ここでは,振り分けして得られた結果の分析を高速化することを考えます.

Go言語で説明をします.以下のデータ構造が与えられたとします.

type Item struct {
	Id               string            `json:"id"`
	AllocatedGroupId string            `json:"allocatedGroupId"`
	Attributes       map[string]string `json:"attributes"`
}

type Group struct {
	Id         string            `json:"id"`
	Attributes map[string]string `json:"attributes"`
}

type Condition struct {
	Id         string            `json:"id"`
	CalcMethod string            `json:"calcMethod"`
	Target     string            `json:"target"`
	Filters    map[string]string `json:"filters"`
}

Item, Groupの配列と Conditionがあります.データ例は,以下の通りです.

{
    "id": "item001",
    "allocatedGroupId": "group022",
    "attributes": {
        "name": "三色ボールペン",
        "category": "ペン",
        "price": "80",
        "weight": "34.4",
        "release": "2021",
        "seller": "A社",
        "discount": "不可",
        ...
    }
}
{
    "id": "group022",
    "attributes": {
        "name": "東京第3倉庫2F",
        "affiliation": "東京第3倉庫",
        "address": "東京都XXX",
        "capacity": "1800",
        "useOfShips": "false",
        "heightLimit": "230",
        "weightLimit": "500",
        ...
    }
}
{
    "id": "condition001",
    "calcMethod": "mean",
    "target": "price",
    "filters": {
        "category": "ペン",
        "affiliation": "東京第3倉庫"
    }
}

入力データ構造は,他の機能との兼ね合いや汎用性を考慮したものになっています.
東京第3倉庫にあるペンItemについて,その平均価格を高速に求めるのがミッションです.

愚直な実装 (calcTime01, 8.730秒)

前処理なしのプログラムを実装します.

func calcTime01(items []Item, groups []Group, condition Condition) {
	now := time.Now()
	for i := 0; i < 1e5; i++ {
		calc01(items, groups, condition)
	}
	fmt.Println(time.Since(now))
}

func calc01(items []Item, groups []Group, condition Condition) float64 {
	numbers := make([]float64, 0, len(items))
Loop:
	for _, item := range items {

		// itemとgroupの紐づけ
		f := func(x Group) bool { return item.AllocatedGroupId == x.Id }
		group, _ := lo.Find(groups, f)
		maps.Copy(item.Attributes, group.Attributes)

		// condition.Filtersの適用
		for key, value := range condition.Filters {
			if item.Attributes[key] != value {
				continue Loop
			}
		}

		// condition.Targetの処理
		value := item.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		numbers = append(numbers, number)
	}

	// switchの分岐
	switch condition.CalcMethod {
	case "sum":
		return floats.Sum(numbers)
	case "product":
		return floats.Prod(numbers)
	case "max":
		return floats.Max(numbers)
	case "min":
		return floats.Min(numbers)
	case "mean":
		return stat.Mean(numbers, nil)
	case "std":
		return stat.StdDev(numbers, nil)
	}
	return 0
}

プログラムの実行時間は,8.7301438秒でした.問題点がたくさんあります.改良していきましょう.

switchの分岐を変更した実装 (calcTime02, 8.670秒)

switchの分岐をループの外側で実行します.合わせて,スカラー値の計算もcalc関数の外側に移動します.

func calcTime02(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐
	var function func([]float64) float64
	switch condition.CalcMethod {
	case "sum":
		function = func(x []float64) float64 { return floats.Sum(x) }
	case "product":
		function = func(x []float64) float64 { return floats.Prod(x) }
	case "max":
		function = func(x []float64) float64 { return floats.Max(x) }
	case "min":
		function = func(x []float64) float64 { return floats.Min(x) }
	case "mean":
		function = func(x []float64) float64 { return stat.Mean(x, nil) }
	case "std":
		function = func(x []float64) float64 { return stat.StdDev(x, nil) }
	}

	for i := 0; i < 1e5; i++ {
		numbers := calc02(items, groups, condition)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc02(items []Item, groups []Group, condition Condition) []float64 {
	numbers := make([]float64, 0, len(items))
Loop:
	for _, item := range items {
		f := func(x Group) bool { return item.AllocatedGroupId == x.Id }
		group, _ := lo.Find(groups, f)
		maps.Copy(item.Attributes, group.Attributes)

		for key, value := range condition.Filters {
			if item.Attributes[key] != value {
				continue Loop
			}
		}

		value := item.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		numbers = append(numbers, number)
	}
	return values
}

プログラムの実行時間は,8.6701029秒でした.この調子で.改良していきましょう.

groupsを変更した実装 (calcTime03, 7.402秒)

groupの検索処理を変更します.

func calcTime03(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newGroupsの作成
	f := func(x Group) (string, map[string]string) { return x.Id, x.Attributes }
	newGroups := lo.Associate(groups, f)

	for i := 0; i < 1e5; i++ {
		numbers := calc03(items, newGroups, condition)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc03(items []Item, groups map[string]map[string]string, condition Condition) []float64 {
	numbers := make([]float64, 0, len(items))
Loop:
	for _, item := range items {

		// itemとgroupの紐づけ
		maps.Copy(item.Attributes, groups[item.AllocatedGroupId])

		// condition.Filtersの適用
		for key, value := range condition.Filters {
			if item.Attributes[key] != value {
				continue Loop
			}
		}

		// condition.Targetの処理
		value := item.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		numbers = append(numbers, number)
	}
	return numbers
}

プログラムの実行時間は,7.4015557秒でした.groupsには,まだ改良の余地がありそうです.

groupsを改良した実装 (calcTime04, 0.946秒)

group.Attributesの不要なデータを前処理で削減します.

func calcTime04(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newGroupsの作成
	keys := append(lo.Keys(condition.Filters), condition.Target)
	f := func(x Group) (string, map[string]string) { return x.Id, lo.PickByKeys(x.Attributes, keys) }
	newGroups := lo.Associate(groups, f)

	for i := 0; i < 1e5; i++ {
		numbers := calc04(items, newGroups, condition)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc04(items []Item, groups map[string]map[string]string, condition Condition) []float64 {
	numbers := make([]float64, 0, len(items))
Loop:
	for _, item := range items {

		// itemとgroupの紐づけ
		maps.Copy(item.Attributes, groups[item.AllocatedGroupId])

		// condition.Filtersの適用
		for key, value := range condition.Filters {
			if item.Attributes[key] != value {
				continue Loop
			}
		}

		// condition.Targetの処理
		value := item.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		numbers = append(numbers, number)
	}
	return numbers
}

プログラムの実行時間は,0.9466403秒でした.groupsの改良で,24行目のコピー処理が高速になりました.実行時間は1秒を切りましたが,更なる高速化を目指します.

itemsを変更した実装 (calcTime05, 0.496秒)

itemgroupの対応関係は変化するデータであり,前処理で対処できません.しかし,それ以外のデータはitemsも変更できるものとします.前処理でitemscondition.Filtersを適用します.

func calcTime05(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newItemsの作成
	newItems := lo.Filter(items, func(x Item, _ int) bool {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return false
			}
		}
		return true
	})

	// newGroupsの作成
	keys := append(lo.Keys(condition.Filters), condition.Target)
	f := func(x Group) (string, map[string]string) { return x.Id, lo.PickByKeys(x.Attributes, keys) }
	newGroups := lo.Associate(groups, f)

	for i := 0; i < 1e5; i++ {
		numbers := calc05(newItems, newGroups, condition)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc05(items []Item, groups map[string]map[string]string, condition Condition) []float64 {
	numbers := make([]float64, 0, len(items))
Loop:
	for _, item := range items {

		// itemとgroupの紐づけ
		maps.Copy(item.Attributes, groups[item.AllocatedGroupId])

		// condition.Filtersの適用
		for key, value := range condition.Filters {
			if item.Attributes[key] != value {
				continue Loop
			}
		}

		// condition.Targetの処理
		value := item.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		numbers = append(numbers, number)
	}
	return numbers
}

プログラムの実行時間は,0.4960819秒でした.前処理によるデータ削減が効いています.

conditionの改修 (calcTime06, 0.075秒)

前処理でgroupsに対してもcondition.Filtersを適用します.

func calcTime06(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newItemsの作成
	newItems := lo.Filter(items, func(x Item, _ int) bool {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return false
			}
		}
		return true
	})

	// newGroupsの作成
	newGroups := lo.Associate(groups, func(x Group) (string, map[string]string) {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return x.Id, nil
			}
		}
		return x.Id, lo.PickByKeys(x.Attributes, []string{condition.Target})
	})

	for i := 0; i < 1e5; i++ {
		numbers := calc06(newItems, newGroups, condition.Target)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc06(items []Item, groups map[string]map[string]string, target string) []float64 {
	numbers := make([]float64, 0, len(items))
	for _, item := range items {

		// itemとgroupの紐づけとフィルター
		groupAttributes := groups[item.AllocatedGroupId]
		if groupAttributes == nil {
			continue
		}
		maps.Copy(item.Attributes, groupAttributes)

		// condition.Targetの処理
		value := item.Attributes[target]
		number, _ := strconv.ParseFloat(value, 64)
		numbers = append(numbers, number)
	}
	return numbers
}

プログラムの実行時間は,0.0749873秒でした.44行目のコピー実行回数の減少が高速化に効果的です.

conditionの削除 (calcTime07, 0.055秒)

ループ内でconditionを呼び出さないようにします.

func calcTime07(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newItemsの作成 省略

	// itemsTargetの作成
	itemsTarget := lo.Map(newItems, func(x Item, _ int) float64 {
		value := x.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		return number
	})

	// groupsTargetの作成
	groupsTarget := lo.Associate(groups, func(x Group) (string, *float64) {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return x.Id, nil
			}
		}
		value := x.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		return x.Id, &number
	})

	for i := 0; i < 1e5; i++ {
		numbers := calc07(newItems, itemsTarget, groupsTarget)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc07(items []Item, itemsTarget []float64, groupsTarget map[string]*float64) []float64 {
	numbers := make([]float64, 0, len(items))
	for i, item := range items {
		groupTarget := groupsTarget[item.AllocatedGroupId]
		if groupTarget == nil {
			continue
		}
		numbers = append(numbers, itemsTarget[i]+*groupTarget)
	}
	return numbers
}

プログラムの実行時間は,0.0551922秒でした.ループ内の処理が少なくなりました.
condition.TargetitemgroupのどちらのAttributesでも対応できる形式です.
42行目は,condition.Targetが指定するAttributesの数値 + 0を計算しています.

itemsの改修 (calcTime08, 0.051秒)

itemallocatedGroupIdを配列にできると,プログラムは更に高速化できます.

func calcTime08(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newItemsの作成 省略

	// allocatedGroupIdsの作成
	f := func(x Item, _ int) string { return x.AllocatedGroupId }
	allocatedGroupIds := lo.Map(newItems, f)

	// itemsTargetの作成 省略

	// groupsTargetの作成 省略

	for i := 0; i < 1e5; i++ {
		numbers := calc08(allocatedGroupIds, itemsTarget, groupsTarget)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc08(groupIds []string, itemsTarget []float64, groupsTarget map[string]*float64) []float64 {
	numbers := make([]float64, 0, len(groupIds))
	for i := range groupIds {
		groupTarget := groupsTarget[groupIds[i]]
		if groupTarget == nil {
			continue
		}
		numbers = append(numbers, itemsTarget[i]+*groupTarget)
	}
	return numbers
}

プログラムの実行時間は,0.0512611秒でした.改修もあと少しです.

idをindexに変更 (calcTime09, 0.012秒)

今まではitemgroupの紐づけにidを使っていましたが,idをindexに変えると高速化できます.

func calcTime09(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐 省略

	// newItemsの作成 省略

	// allocatedGroupIndexesの作成
	allocatedGroupIndexes := lo.Map(newItems, func(item Item, _ int) int {
		f := func(group Group) bool { return item.AllocatedGroupId == group.Id }
		_, index, _ := lo.FindIndexOf(groups, f)
		return index
	})

	// itemsTargetの作成 省略

	// groupsTargetの作成
	groupsTarget := lo.Map(groups, func(x Group, _ int) *float64 {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return nil
			}
		}
		value := x.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		return &number
	})

	for i := 0; i < 1e5; i++ {
		numbers := calc09(allocatedGroupIndexes, itemsTarget, groupsTarget)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc09(groupIndexes []int, itemsTarget []float64, groupsTarget []*float64) []float64 {
	numbers := make([]float64, 0, len(groupIndexes))
	for i := range groupIndexes {
		groupTarget := groupsTarget[groupIndexes[i]]
		if groupTarget == nil {
			continue
		}
		numbers = append(numbers, itemsTarget[i]+*groupTarget)
	}
	return numbers
}

プログラムの実行時間は,0.0123349秒でした.最後に,プログラムをキレイに整理します

最終的なプログラム (calcTime10, 0.010秒)

func calcTime10(items []Item, groups []Group, condition Condition) {
	now := time.Now()

	// switchの分岐
	var function func([]float64) float64
	switch condition.CalcMethod {
	case "sum":
		function = func(x []float64) float64 { return floats.Sum(x) }
	case "product":
		function = func(x []float64) float64 { return floats.Prod(x) }
	case "max":
		function = func(x []float64) float64 { return floats.Max(x) }
	case "min":
		function = func(x []float64) float64 { return floats.Min(x) }
	case "mean":
		function = func(x []float64) float64 { return stat.Mean(x, nil) }
	case "std":
		function = func(x []float64) float64 { return stat.StdDev(x, nil) }
	}

	// newItemsの作成
	newItems := lo.Filter(items, func(x Item, _ int) bool {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return false
			}
		}
		return true
	})

	// allocatedGroupIndexesの作成
	allocatedGroupIndexes := lo.Map(newItems, func(item Item, _ int) int {
		f := func(group Group) bool { return item.AllocatedGroupId == group.Id }
		_, index, _ := lo.FindIndexOf(groups, f)
		return index
	})

	// itemsTargetの作成
	itemsTarget := lo.Map(newItems, func(x Item, _ int) float64 {
		value := x.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		return number
	})

	// groupsTargetの作成
	groupsTarget := lo.Map(groups, func(x Group, _ int) *float64 {
		for key, value := range condition.Filters {
			value2, exist := x.Attributes[key]
			if exist && value2 != value {
				return nil
			}
		}
		value := x.Attributes[condition.Target]
		number, _ := strconv.ParseFloat(value, 64)
		return &number
	})

	for i := 0; i < 1e5; i++ {
		numbers := calc10(allocatedGroupIndexes, itemsTarget, groupsTarget)
		function(numbers)
	}
	fmt.Println(time.Since(now))
}

func calc10(groupIndexes []int, itemsTarget []float64, groupsTarget []*float64) []float64 {
	numbers := make([]float64, 0, len(groupIndexes))
	for i := range groupIndexes {
		if groupTarget := groupsTarget[groupIndexes[i]]; groupTarget != nil {
			numbers = append(numbers, itemsTarget[i]+*groupTarget)
		}
	}
	return numbers
}

プログラムの実行時間は,0.0100372秒でした.プログラムを870倍高速化できました.

補足

今回の問題設定では,ItemGroupの対応関係のみ変化する想定です.プログラムの高速化では,変化する部分のみを抜き出し,それに影響しない部分を予め計算しておくことが大事です.

変化する部分 (items[].AllocatedGroupId, allocatedGroupIds, groupIndexes) は,組み合わせ最適化の分野では設計変数ベクトル,決定変数ベクトル,遺伝子,解などの言葉で表現します.
設計変数ベクトルは,プログラムを高速に動かすため,単純な数値の配列で管理することをお勧めします.

最適化アルゴリズムや最適化ライブラリは高速に動くので,高速化の余地は少ないです.
最適化プログラムが遅いと感じた人は,設計変数ベクトルの表現と評価関数を見直してみてください.

感想・終わりに

プログラムを高速化した話を紹介しました.
コードを数十行書き換えることで,数百倍高速に動くことが明らかになりました.
コードの読みやすさも考慮しつつ,高速なプログラムの実装を目指しましょう.