mirror of
https://github.com/paralus/paralus.git
synced 2026-02-14 17:49:51 +00:00
initial commit for changes to support database as backend for audit logs
This commit is contained in:
43
gen/openapi/proto/types/audit/audit.swagger.json
Normal file
43
gen/openapi/proto/types/audit/audit.swagger.json
Normal file
@@ -0,0 +1,43 @@
|
||||
{
|
||||
"swagger": "2.0",
|
||||
"info": {
|
||||
"title": "proto/types/audit/audit.proto",
|
||||
"version": "version not set"
|
||||
},
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"paths": {},
|
||||
"definitions": {
|
||||
"protobufAny": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"@type": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"additionalProperties": {}
|
||||
},
|
||||
"rpcStatus": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"code": {
|
||||
"type": "integer",
|
||||
"format": "int32"
|
||||
},
|
||||
"message": {
|
||||
"type": "string"
|
||||
},
|
||||
"details": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/protobufAny"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
124
internal/dao/auditlog.go
Normal file
124
internal/dao/auditlog.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/paralus/paralus/internal/models"
|
||||
"github.com/paralus/paralus/pkg/query"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
func GetAuditLogAggregations(ctx context.Context, db *bun.DB, tag, field string, filters query.QueryFilters) ([]models.AggregatorData, error) {
|
||||
var adata []models.AggregatorData
|
||||
sq := db.NewSelect().Table("audit_logs").
|
||||
ColumnExpr("count(1) as count")
|
||||
|
||||
switch field {
|
||||
case "type":
|
||||
sq.ColumnExpr("data->>'type' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'type'")
|
||||
case "username":
|
||||
if tag != "kubectl_api" {
|
||||
sq.ColumnExpr("data->'actor'->'account'->>'username' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->'actor'->'account'->>'username'")
|
||||
} else {
|
||||
sq.ColumnExpr("data->>'un' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'un'")
|
||||
}
|
||||
case "project":
|
||||
sq.ColumnExpr("data->>'project' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'project'")
|
||||
case "cluster":
|
||||
sq.ColumnExpr("data->>'cn' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'cn'")
|
||||
case "namespace":
|
||||
sq.ColumnExpr("data->>'n' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'n'")
|
||||
case "kind":
|
||||
sq.ColumnExpr("data->>'k' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'k'")
|
||||
case "method":
|
||||
sq.ColumnExpr("data->>'m' as key").
|
||||
Where("tag = ?", tag).GroupExpr("data->>'m'")
|
||||
}
|
||||
|
||||
// add filters
|
||||
switch tag {
|
||||
case "kubectl_api":
|
||||
sq = buildRelayAuditQuery(sq, filters)
|
||||
case "system", "kubectl_cmd":
|
||||
sq = buildQuery(sq, filters)
|
||||
}
|
||||
|
||||
err := sq.Scan(ctx, &adata)
|
||||
return adata, err
|
||||
}
|
||||
|
||||
func GetAuditLogs(ctx context.Context, db *bun.DB, tag string, filters query.QueryFilters) ([]models.AuditLog, error) {
|
||||
var logs []models.AuditLog
|
||||
sq := db.NewSelect().Model(&logs).
|
||||
Where("tag = ?", tag)
|
||||
|
||||
switch tag {
|
||||
case "kubectl_api":
|
||||
sq = buildRelayAuditQuery(sq, filters)
|
||||
case "system", "kubectl_cmd":
|
||||
sq = buildQuery(sq, filters)
|
||||
}
|
||||
|
||||
err := sq.Scan(ctx)
|
||||
return logs, err
|
||||
}
|
||||
|
||||
func buildRelayAuditQuery(query *bun.SelectQuery, filters query.QueryFilters) *bun.SelectQuery {
|
||||
if filters.GetUser() != "" {
|
||||
query.Where("data->>'un' = ?", filters.GetUser())
|
||||
}
|
||||
|
||||
if filters.GetKind() != "" {
|
||||
query.Where("data->>'k' = ?", filters.GetKind())
|
||||
}
|
||||
|
||||
if filters.GetMethod() != "" {
|
||||
query.Where("data->>'m' = ?", filters.GetMethod())
|
||||
}
|
||||
if filters.GetNamespace() != "" {
|
||||
query.Where("data->>'ns' = ?", filters.GetNamespace())
|
||||
}
|
||||
if filters.GetCluster() != "" {
|
||||
query.Where("data->>'cn' = ?", filters.GetCluster())
|
||||
}
|
||||
if filters.GetTimefrom() != "" {
|
||||
diff := strings.Split(filters.GetTimefrom(), "-")[1]
|
||||
query.Where("to_timestamp(data->>'ts', 'YYYY-MM-DD\"T\"HH:MI:SS') between now() - interval ? and now()", diff)
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func buildQuery(query *bun.SelectQuery, filters query.QueryFilters) *bun.SelectQuery {
|
||||
if len(filters.GetProjects()) > 0 {
|
||||
for _, project := range filters.GetProjects() {
|
||||
query.Where("data->>'project' = ?", project)
|
||||
}
|
||||
}
|
||||
|
||||
if filters.GetType() != "" {
|
||||
query.Where("data->>'type' = ?", filters.GetType())
|
||||
}
|
||||
|
||||
if filters.GetUser() != "" {
|
||||
query.Where("data->'actor'->'account'->>'username' = ?", filters.GetUser())
|
||||
}
|
||||
|
||||
if filters.GetClient() != "" {
|
||||
query.Where("data->'client'->>'type' = ?", filters.GetClient())
|
||||
}
|
||||
|
||||
if filters.GetTimefrom() != "" {
|
||||
diff := strings.Split(filters.GetTimefrom(), "-")[1]
|
||||
query.Where("to_timestamp(data->>'timestamp', 'YYYY-MM-DD\"T\"HH:MI:SS') between now() - interval ? and now()", diff)
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
21
internal/models/auditlog.go
Normal file
21
internal/models/auditlog.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type AuditLog struct {
|
||||
bun.BaseModel `bun:"table:audit_logs,alias:auditlog"`
|
||||
|
||||
Tag string `bun:"tag,notnull"`
|
||||
Time time.Time `bun:"time,notnull"`
|
||||
Data json.RawMessage `bun:"data,type:jsonb,notnull"`
|
||||
}
|
||||
|
||||
type AggregatorData struct {
|
||||
Count int64
|
||||
Key string
|
||||
}
|
||||
96
main.go
96
main.go
@@ -74,6 +74,7 @@ const (
|
||||
relayImageEnv = "RELAY_IMAGE"
|
||||
|
||||
// audit
|
||||
auditLogStorageEnv = "AUDIT_LOG_STORAGE"
|
||||
auditFileEnv = "AUDIT_LOG_FILE"
|
||||
esEndPointEnv = "ES_END_POINT"
|
||||
esIndexPrefixEnv = "ES_INDEX_PREFIX"
|
||||
@@ -117,6 +118,7 @@ var (
|
||||
relayImage string
|
||||
|
||||
// audit
|
||||
auditLogStorage string
|
||||
auditFile string
|
||||
elasticSearchUrl string
|
||||
esIndexPrefix string
|
||||
@@ -157,9 +159,9 @@ var (
|
||||
rrs service.RolepermissionService
|
||||
is service.IdpService
|
||||
oidcs service.OIDCProviderService
|
||||
aus *service.AuditLogService
|
||||
ras *service.RelayAuditService
|
||||
rcs *service.AuditLogService
|
||||
aus service.AuditLogService
|
||||
ras service.RelayAuditService
|
||||
rcs service.AuditLogService
|
||||
|
||||
schedulerPool schedulerrpc.SchedulerPool
|
||||
schedulerAddr string
|
||||
@@ -196,6 +198,7 @@ func setup() {
|
||||
viper.SetDefault(relayImageEnv, "paralusio/relay:v0.1.0")
|
||||
|
||||
// audit
|
||||
viper.SetDefault(auditLogStorageEnv, "database")
|
||||
viper.SetDefault(esEndPointEnv, "http://127.0.0.1:9200")
|
||||
viper.SetDefault(esIndexPrefixEnv, "ralog-system")
|
||||
viper.SetDefault(relayAuditESIndexPrefixEnv, "ralog-relay")
|
||||
@@ -236,6 +239,7 @@ func setup() {
|
||||
viper.BindEnv(relayImageEnv)
|
||||
viper.BindEnv(schedulerNamespaceEnv)
|
||||
|
||||
viper.BindEnv(auditLogStorageEnv)
|
||||
viper.BindEnv(auditFileEnv)
|
||||
viper.BindEnv(esEndPointEnv)
|
||||
viper.BindEnv(esIndexPrefixEnv)
|
||||
@@ -267,6 +271,7 @@ func setup() {
|
||||
schedulerNamespace = viper.GetString(schedulerNamespaceEnv)
|
||||
sentryBootstrapAddr = viper.GetString(sentryBootstrapEnv)
|
||||
|
||||
auditLogStorage = viper.GetString(auditLogStorageEnv)
|
||||
auditFile = viper.GetString(auditFileEnv)
|
||||
elasticSearchUrl = viper.GetString(esEndPointEnv)
|
||||
esIndexPrefix = viper.GetString(esIndexPrefixEnv)
|
||||
@@ -359,34 +364,69 @@ func setup() {
|
||||
aps = service.NewAccountPermissionService(db)
|
||||
gps = service.NewGroupPermissionService(db)
|
||||
|
||||
// audit services
|
||||
aus, err = service.NewAuditLogService(elasticSearchUrl, esIndexPrefix+"-*", "AuditLog API: ")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
// This is primarily from ES not being available. ES being
|
||||
// pretty heavy, you might not always wanna have it
|
||||
// running in the background. This way, you can continue
|
||||
// working on paralus with ES eating up all the cpu.
|
||||
_log.Warn("unable to create auditLog service: ", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create auditLog service", "error", err)
|
||||
switch auditLogStorage {
|
||||
case "database":
|
||||
// audit services
|
||||
aus, err = service.NewAuditLogDatabaseService(db, "system")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
// This is primarily from ES not being available. ES being
|
||||
// pretty heavy, you might not always wanna have it
|
||||
// running in the background. This way, you can continue
|
||||
// working on paralus with ES eating up all the cpu.
|
||||
_log.Warn("unable to create auditLog service: ", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create auditLog service", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
ras, err = service.NewRelayAuditService(elasticSearchUrl, relayAuditsESIndexPrefix+"-*", "RelayAudit API: ")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
_log.Warn("unable to create relayAudit service: ", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create relayAudit service", "error", err)
|
||||
ras, err = service.NewRelayAuditDatabaseService(db, "kubectl_api")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
_log.Warn("unable to create relayAudit service: ", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create relayAudit service", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
rcs, err = service.NewAuditLogService(elasticSearchUrl, relayCommandsESIndexPrefix+"-*", "RelayCommand API: ")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
_log.Warn("unable to create auditLog service:", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create auditLog service", "error", err)
|
||||
rcs, err = service.NewAuditLogDatabaseService(db, "kubectl_cmd")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
_log.Warn("unable to create auditLog service:", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create auditLog service", "error", err)
|
||||
}
|
||||
}
|
||||
case "elasticsearch":
|
||||
// audit services
|
||||
aus, err = service.NewAuditLogElasticSearchService(elasticSearchUrl, esIndexPrefix+"-*", "AuditLog API: ")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
// This is primarily from ES not being available. ES being
|
||||
// pretty heavy, you might not always wanna have it
|
||||
// running in the background. This way, you can continue
|
||||
// working on paralus with ES eating up all the cpu.
|
||||
_log.Warn("unable to create auditLog service: ", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create auditLog service", "error", err)
|
||||
}
|
||||
}
|
||||
ras, err = service.NewRelayAuditElasticSearchService(elasticSearchUrl, relayAuditsESIndexPrefix+"-*", "RelayAudit API: ")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
_log.Warn("unable to create relayAudit service: ", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create relayAudit service", "error", err)
|
||||
}
|
||||
}
|
||||
rcs, err = service.NewAuditLogElasticSearchService(elasticSearchUrl, relayCommandsESIndexPrefix+"-*", "RelayCommand API: ")
|
||||
if err != nil {
|
||||
if dev && strings.Contains(err.Error(), "connect: connection refused") {
|
||||
_log.Warn("unable to create auditLog service:", err)
|
||||
} else {
|
||||
_log.Fatalw("unable to create auditLog service", "error", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
// cluster bootstrap
|
||||
|
||||
20
pkg/query/filters.go
Normal file
20
pkg/query/filters.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package query
|
||||
|
||||
import v1 "github.com/paralus/paralus/proto/rpc/audit"
|
||||
|
||||
type QueryFilters interface {
|
||||
GetType() string
|
||||
GetUser() string
|
||||
GetClient() string
|
||||
GetTimefrom() string
|
||||
GetPortal() string
|
||||
GetCluster() string
|
||||
GetNamespace() string
|
||||
GetKind() string
|
||||
GetMethod() string
|
||||
GetQueryString() string
|
||||
GetProjects() []string
|
||||
}
|
||||
|
||||
var _ QueryFilters = (*v1.AuditLogQueryFilter)(nil)
|
||||
var _ QueryFilters = (*v1.RelayAuditQueryFilter)(nil)
|
||||
40
pkg/service/audit_log.go
Normal file
40
pkg/service/audit_log.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
v1 "github.com/paralus/paralus/proto/rpc/audit"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type AuditLogService interface {
|
||||
GetAuditLog(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error)
|
||||
GetAuditLogByProjects(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error)
|
||||
}
|
||||
|
||||
func NewAuditLogElasticSearchService(url string, auditPattern string, logPrefix string) (AuditLogService, error) {
|
||||
auditQuery, err := NewElasticSearchQuery(url, auditPattern, logPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &auditLogElasticSearchService{auditQuery: auditQuery}, nil
|
||||
}
|
||||
|
||||
func NewAuditLogDatabaseService(db *bun.DB, tag string) (AuditLogService, error) {
|
||||
return &auditLogDatabaseService{db: db, tag: tag}, nil
|
||||
}
|
||||
|
||||
type RelayAuditService interface {
|
||||
GetRelayAudit(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error)
|
||||
GetRelayAuditByProjects(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error)
|
||||
}
|
||||
|
||||
func NewRelayAuditDatabaseService(db *bun.DB, tag string) (RelayAuditService, error) {
|
||||
return &relayAuditDatabaseService{db: db, tag: tag}, nil
|
||||
}
|
||||
|
||||
func NewRelayAuditElasticSearchService(url string, auditPattern string, logPrefix string) (RelayAuditService, error) {
|
||||
relayQuery, err := NewElasticSearchQuery(url, auditPattern, logPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &relayAuditElasticSearchService{relayQuery: relayQuery}, nil
|
||||
}
|
||||
102
pkg/service/audit_log_db.go
Normal file
102
pkg/service/audit_log_db.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/paralus/paralus/internal/dao"
|
||||
"github.com/paralus/paralus/internal/models"
|
||||
v1 "github.com/paralus/paralus/proto/rpc/audit"
|
||||
auditv1 "github.com/paralus/paralus/proto/types/audit"
|
||||
"github.com/uptrace/bun"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
type auditLogDatabaseService struct {
|
||||
db *bun.DB
|
||||
tag string
|
||||
}
|
||||
|
||||
func (a *auditLogDatabaseService) GetAuditLog(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error) {
|
||||
project, err := getProjectFromUrlScope(req.GetMetadata().UrlScope)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Filter.Projects = []string{project}
|
||||
return a.GetAuditLogByProjects(req)
|
||||
}
|
||||
|
||||
func buildAggregators(aggr []models.AggregatorData) []*auditv1.GroupByType {
|
||||
var groups = make([]*auditv1.GroupByType, 0)
|
||||
for _, agg := range aggr {
|
||||
groups = append(groups, &auditv1.GroupByType{
|
||||
DocCount: int32(agg.Count),
|
||||
Key: agg.Key,
|
||||
})
|
||||
}
|
||||
return groups
|
||||
}
|
||||
|
||||
func buildDataSource(logs []models.AuditLog) (ds []*auditv1.DataSource) {
|
||||
for _, log := range logs {
|
||||
data := &auditv1.Data{}
|
||||
json.Unmarshal(log.Data, data)
|
||||
ds = append(ds, &auditv1.DataSource{
|
||||
XSource: &auditv1.DataSourceJSON{
|
||||
Json: data,
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
return ds
|
||||
}
|
||||
|
||||
func (a *auditLogDatabaseService) GetAuditLogByProjects(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error) {
|
||||
err = validateQueryString(req.GetFilter().QueryString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := context.Background()
|
||||
auditLogs, err := dao.GetAuditLogs(ctx, a.db, a.tag, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// aggregations
|
||||
_, err = dao.GetAuditLogAggregations(ctx, a.db, a.tag, "project", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
usernameAggr, err := dao.GetAuditLogAggregations(ctx, a.db, a.tag, "username", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
typeAggr, err := dao.GetAuditLogAggregations(ctx, a.db, a.tag, "type", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := &auditv1.AuditResponse{
|
||||
Aggregations: &auditv1.Aggregations{
|
||||
GroupByType: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(typeAggr),
|
||||
},
|
||||
GroupByUsername: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(usernameAggr),
|
||||
},
|
||||
},
|
||||
Hits: &auditv1.Hits{Hits: buildDataSource(auditLogs)},
|
||||
}
|
||||
|
||||
var resMap map[string]interface{}
|
||||
data, _ := json.Marshal(response)
|
||||
json.Unmarshal(data, &resMap)
|
||||
|
||||
result, _ := structpb.NewStruct(resMap)
|
||||
res = &v1.GetAuditLogSearchResponse{
|
||||
Result: result,
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
@@ -10,23 +10,15 @@ import (
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
type AuditLogService struct {
|
||||
type auditLogElasticSearchService struct {
|
||||
auditQuery ElasticSearchQuery
|
||||
}
|
||||
|
||||
func NewAuditLogService(url string, auditPattern string, logPrefix string) (*AuditLogService, error) {
|
||||
auditQuery, err := NewElasticSearchQuery(url, auditPattern, logPrefix)
|
||||
func (a *auditLogElasticSearchService) GetAuditLog(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &AuditLogService{auditQuery: auditQuery}, nil
|
||||
}
|
||||
|
||||
func (a *AuditLogService) GetAuditLog(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
project, err := getPrjectFromUrlScope(req.GetMetadata().UrlScope)
|
||||
project, err := getProjectFromUrlScope(req.GetMetadata().UrlScope)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -44,7 +36,7 @@ func validateQueryString(queryString string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPrjectFromUrlScope(urlScope string) (string, error) {
|
||||
func getProjectFromUrlScope(urlScope string) (string, error) {
|
||||
s := strings.Split(urlScope, "/")
|
||||
if len(s) != 2 {
|
||||
_log.Errorw("Unable to retrieve project from urlScope", "urlScope", urlScope)
|
||||
@@ -53,7 +45,7 @@ func getPrjectFromUrlScope(urlScope string) (string, error) {
|
||||
return s[1], nil
|
||||
}
|
||||
|
||||
func (a *AuditLogService) GetAuditLogByProjects(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error) {
|
||||
func (a *auditLogElasticSearchService) GetAuditLogByProjects(req *v1.GetAuditLogSearchRequest) (res *v1.GetAuditLogSearchResponse, err error) {
|
||||
err = validateQueryString(req.GetFilter().QueryString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -86,7 +86,7 @@ func (m *mockElasticSearchQuery) Handle(msg bytes.Buffer) (map[string]interface{
|
||||
|
||||
func TestGetAuditLogByProjectsSimple(t *testing.T) {
|
||||
esq := &mockElasticSearchQuery{}
|
||||
al := &AuditLogService{auditQuery: esq}
|
||||
al := &auditLogElasticSearchService{auditQuery: esq}
|
||||
req := v1.GetAuditLogSearchRequest{
|
||||
Filter: &v1.AuditLogQueryFilter{
|
||||
QueryString: "query-string",
|
||||
@@ -118,7 +118,7 @@ func TestGetAuditLogByProjectsSimple(t *testing.T) {
|
||||
|
||||
func TestGetAuditLogByProjectsNoProject(t *testing.T) {
|
||||
esq := &mockElasticSearchQuery{}
|
||||
al := &AuditLogService{auditQuery: esq}
|
||||
al := &auditLogElasticSearchService{auditQuery: esq}
|
||||
req := v1.GetAuditLogSearchRequest{
|
||||
Metadata: &v3.Metadata{UrlScope: "url/project"},
|
||||
Filter: &v1.AuditLogQueryFilter{
|
||||
99
pkg/service/relay_audit_db.go
Normal file
99
pkg/service/relay_audit_db.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/paralus/paralus/internal/dao"
|
||||
v1 "github.com/paralus/paralus/proto/rpc/audit"
|
||||
auditv1 "github.com/paralus/paralus/proto/types/audit"
|
||||
"github.com/uptrace/bun"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
type relayAuditDatabaseService struct {
|
||||
db *bun.DB
|
||||
tag string
|
||||
}
|
||||
|
||||
func (ra *relayAuditDatabaseService) GetRelayAudit(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
project, err := getProjectFromUrlScope(req.GetMetadata().UrlScope)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Filter.Projects = []string{project}
|
||||
return ra.GetRelayAuditByProjects(req)
|
||||
}
|
||||
|
||||
func (ra *relayAuditDatabaseService) GetRelayAuditByProjects(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error) {
|
||||
err = validateQueryString(req.GetFilter().QueryString)
|
||||
if err != nil {
|
||||
return &v1.RelayAuditResponse{}, err
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
auditLogs, err := dao.GetAuditLogs(ctx, ra.db, ra.tag, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// aggregations
|
||||
clusterAggr, err := dao.GetAuditLogAggregations(ctx, ra.db, ra.tag, "cluster", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
usernameAggr, err := dao.GetAuditLogAggregations(ctx, ra.db, ra.tag, "username", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nsAggr, err := dao.GetAuditLogAggregations(ctx, ra.db, ra.tag, "namespace", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kindAggr, err := dao.GetAuditLogAggregations(ctx, ra.db, ra.tag, "kind", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
methodAggr, err := dao.GetAuditLogAggregations(ctx, ra.db, ra.tag, "method", req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := &auditv1.AuditResponse{
|
||||
Aggregations: &auditv1.Aggregations{
|
||||
GroupByCluster: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(clusterAggr),
|
||||
},
|
||||
GroupByUsername: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(usernameAggr),
|
||||
},
|
||||
GroupByNamespace: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(nsAggr),
|
||||
},
|
||||
GroupByKind: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(kindAggr),
|
||||
},
|
||||
GroupByMethod: &auditv1.AggregatorGroup{
|
||||
Buckets: buildAggregators(methodAggr),
|
||||
},
|
||||
},
|
||||
Hits: &auditv1.Hits{Hits: buildDataSource(auditLogs)},
|
||||
}
|
||||
|
||||
var resMap map[string]interface{}
|
||||
data, _ := json.Marshal(response)
|
||||
json.Unmarshal(data, &resMap)
|
||||
|
||||
result, _ := structpb.NewStruct(resMap)
|
||||
res = &v1.RelayAuditResponse{
|
||||
Result: result,
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
@@ -8,23 +8,15 @@ import (
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
type RelayAuditService struct {
|
||||
type relayAuditElasticSearchService struct {
|
||||
relayQuery ElasticSearchQuery
|
||||
}
|
||||
|
||||
func NewRelayAuditService(url string, auditPattern string, logPrefix string) (*RelayAuditService, error) {
|
||||
relayQuery, err := NewElasticSearchQuery(url, auditPattern, logPrefix)
|
||||
func (ra *relayAuditElasticSearchService) GetRelayAudit(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &RelayAuditService{relayQuery: relayQuery}, nil
|
||||
}
|
||||
|
||||
func (ra *RelayAuditService) GetRelayAudit(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
project, err := getPrjectFromUrlScope(req.GetMetadata().UrlScope)
|
||||
project, err := getProjectFromUrlScope(req.GetMetadata().UrlScope)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -32,7 +24,7 @@ func (ra *RelayAuditService) GetRelayAudit(req *v1.RelayAuditRequest) (res *v1.R
|
||||
return ra.GetRelayAuditByProjects(req)
|
||||
}
|
||||
|
||||
func (ra *RelayAuditService) GetRelayAuditByProjects(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error) {
|
||||
func (ra *relayAuditElasticSearchService) GetRelayAuditByProjects(req *v1.RelayAuditRequest) (res *v1.RelayAuditResponse, err error) {
|
||||
err = validateQueryString(req.GetFilter().QueryString)
|
||||
if err != nil {
|
||||
return &v1.RelayAuditResponse{}, err
|
||||
@@ -86,7 +86,7 @@ type rmd struct {
|
||||
|
||||
func TestGetRelayAuditLogByProjectsSimple(t *testing.T) {
|
||||
esq := &mockElasticSearchQuery{}
|
||||
al := &RelayAuditService{relayQuery: esq}
|
||||
al := &relayAuditElasticSearchService{relayQuery: esq}
|
||||
req := v1.RelayAuditRequest{
|
||||
Filter: &v1.RelayAuditQueryFilter{
|
||||
QueryString: "query-string",
|
||||
@@ -122,7 +122,7 @@ func TestGetRelayAuditLogByProjectsSimple(t *testing.T) {
|
||||
|
||||
func TestGetRelayAuditLogByProjectsNoProject(t *testing.T) {
|
||||
esq := &mockElasticSearchQuery{}
|
||||
al := &RelayAuditService{relayQuery: esq}
|
||||
al := &relayAuditElasticSearchService{relayQuery: esq}
|
||||
req := v1.RelayAuditRequest{
|
||||
Metadata: &v3.Metadata{UrlScope: "url/project"},
|
||||
Filter: &v1.RelayAuditQueryFilter{
|
||||
1347
proto/types/audit/audit.pb.go
Normal file
1347
proto/types/audit/audit.pb.go
Normal file
File diff suppressed because it is too large
Load Diff
98
proto/types/audit/audit.proto
Normal file
98
proto/types/audit/audit.proto
Normal file
@@ -0,0 +1,98 @@
|
||||
syntax = "proto3";
|
||||
package paralus.dev.types.audit.v1;
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
message AuditResponse {
|
||||
Aggregations aggregations = 1;
|
||||
Hits hits = 2;
|
||||
}
|
||||
|
||||
message Aggregations {
|
||||
AggregatorGroup group_by_type = 1;
|
||||
AggregatorGroup group_by_username = 2;
|
||||
AggregatorGroup group_by_cluster = 3;
|
||||
AggregatorGroup group_by_kind = 4;
|
||||
AggregatorGroup group_by_method = 5;
|
||||
AggregatorGroup group_by_namespace = 6;
|
||||
}
|
||||
|
||||
message AggregatorGroup {
|
||||
repeated GroupByType buckets = 1;
|
||||
}
|
||||
|
||||
message GroupByType {
|
||||
int32 doc_count = 1;
|
||||
string key = 2;
|
||||
}
|
||||
|
||||
message DataSourceJSON {
|
||||
Data json = 1;
|
||||
}
|
||||
|
||||
message DataSource {
|
||||
DataSourceJSON _source = 1;
|
||||
}
|
||||
|
||||
message Hits {
|
||||
repeated DataSource hits = 1;
|
||||
}
|
||||
|
||||
message Data {
|
||||
Actor actor = 1;
|
||||
string category = 2;
|
||||
Client client = 3;
|
||||
Detail detail = 4;
|
||||
string origin = 5;
|
||||
string portal = 6;
|
||||
string project = 7;
|
||||
string timestamp = 8;
|
||||
string type = 9;
|
||||
string version = 10;
|
||||
|
||||
// below are for kubectl api, attribute names are maintained as below for consistency with es. need to be refactored to be more meaningful.
|
||||
string av = 11;
|
||||
string cn = 12;
|
||||
double d = 13;
|
||||
string id = 14;
|
||||
string k = 15;
|
||||
string m = 16;
|
||||
string n = 17;
|
||||
string ns = 18;
|
||||
string o = 19;
|
||||
string p = 20;
|
||||
string q = 21;
|
||||
string ra = 22;
|
||||
int32 sc = 23;
|
||||
string st = 24;
|
||||
string ts = 25;
|
||||
string un = 26;
|
||||
string url = 27;
|
||||
int32 w = 28;
|
||||
}
|
||||
|
||||
message Actor {
|
||||
Account account = 1;
|
||||
repeated string groups = 2;
|
||||
string type = 3;
|
||||
}
|
||||
|
||||
message Account {
|
||||
string username = 1;
|
||||
}
|
||||
|
||||
message Client {
|
||||
string host = 1;
|
||||
string ip = 2;
|
||||
string type = 3;
|
||||
string user_agent = 4;
|
||||
}
|
||||
|
||||
message Detail {
|
||||
string message = 1;
|
||||
DetailMeta meta = 2;
|
||||
}
|
||||
|
||||
message DetailMeta {
|
||||
string cluster_name = 1;
|
||||
}
|
||||
@@ -8,13 +8,13 @@ import (
|
||||
)
|
||||
|
||||
type auditLogServer struct {
|
||||
as *q.AuditLogService
|
||||
as q.AuditLogService
|
||||
}
|
||||
|
||||
var _ v1.AuditLogServiceServer = (*auditLogServer)(nil)
|
||||
|
||||
// NewAuditServer returns new placement server implementation
|
||||
func NewAuditLogServer(auditLogService *q.AuditLogService) (v1.AuditLogServiceServer, error) {
|
||||
func NewAuditLogServer(auditLogService q.AuditLogService) (v1.AuditLogServiceServer, error) {
|
||||
return &auditLogServer{as: auditLogService}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -10,14 +10,14 @@ import (
|
||||
)
|
||||
|
||||
type relayAuditServer struct {
|
||||
rs *q.RelayAuditService
|
||||
al *q.AuditLogService
|
||||
rs q.RelayAuditService
|
||||
al q.AuditLogService
|
||||
}
|
||||
|
||||
var _ v1.RelayAuditServiceServer = (*relayAuditServer)(nil)
|
||||
|
||||
// NewAuditServer returns new placement server implementation
|
||||
func NewRelayAuditServer(relayAuditService *q.RelayAuditService, relayCommandAuditService *q.AuditLogService) (v1.RelayAuditServiceServer, error) {
|
||||
func NewRelayAuditServer(relayAuditService q.RelayAuditService, relayCommandAuditService q.AuditLogService) (v1.RelayAuditServiceServer, error) {
|
||||
return &relayAuditServer{
|
||||
rs: relayAuditService,
|
||||
al: relayCommandAuditService,
|
||||
|
||||
Reference in New Issue
Block a user