This repository has been archived by the owner on Jan 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 109
/
engine.go
176 lines (146 loc) · 4.14 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package sqle
import (
"time"
"github.com/go-kit/kit/metrics/discard"
opentracing "github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
"github.com/src-d/go-mysql-server/auth"
"github.com/src-d/go-mysql-server/sql"
"github.com/src-d/go-mysql-server/sql/analyzer"
"github.com/src-d/go-mysql-server/sql/expression/function"
"github.com/src-d/go-mysql-server/sql/parse"
"github.com/src-d/go-mysql-server/sql/plan"
)
// Config for the Engine.
type Config struct {
// VersionPostfix to display with the `VERSION()` UDF.
VersionPostfix string
// Auth used for authentication and authorization.
Auth auth.Auth
}
// Engine is a SQL engine.
type Engine struct {
Catalog *sql.Catalog
Analyzer *analyzer.Analyzer
Auth auth.Auth
}
var (
// QueryCounter describes a metric that accumulates number of queries monotonically.
QueryCounter = discard.NewCounter()
// QueryErrorCounter describes a metric that accumulates number of failed queries monotonically.
QueryErrorCounter = discard.NewCounter()
// QueryHistogram describes a queries latency.
QueryHistogram = discard.NewHistogram()
)
func observeQuery(ctx *sql.Context, query string) func(err error) {
logrus.WithField("query", query).Debug("executing query")
span, _ := ctx.Span("query", opentracing.Tag{Key: "query", Value: query})
t := time.Now()
return func(err error) {
if err != nil {
QueryErrorCounter.With("query", query, "error", err.Error()).Add(1)
} else {
QueryCounter.With("query", query).Add(1)
QueryHistogram.With("query", query, "duration", "seconds").Observe(time.Since(t).Seconds())
}
span.Finish()
}
}
// New creates a new Engine with custom configuration. To create an Engine with
// the default settings use `NewDefault`.
func New(c *sql.Catalog, a *analyzer.Analyzer, cfg *Config) *Engine {
var versionPostfix string
if cfg != nil {
versionPostfix = cfg.VersionPostfix
}
c.MustRegister(
sql.FunctionN{
Name: "version",
Fn: function.NewVersion(versionPostfix),
},
sql.Function0{
Name: "database",
Fn: function.NewDatabase(c),
})
c.MustRegister(function.Defaults...)
// use auth.None if auth is not specified
var au auth.Auth
if cfg == nil || cfg.Auth == nil {
au = new(auth.None)
} else {
au = cfg.Auth
}
return &Engine{c, a, au}
}
// NewDefault creates a new default Engine.
func NewDefault() *Engine {
c := sql.NewCatalog()
a := analyzer.NewDefault(c)
return New(c, a, nil)
}
// Query executes a query.
func (e *Engine) Query(
ctx *sql.Context,
query string,
) (sql.Schema, sql.RowIter, error) {
var (
parsed, analyzed sql.Node
iter sql.RowIter
err error
)
finish := observeQuery(ctx, query)
defer finish(err)
parsed, err = parse.Parse(ctx, query)
if err != nil {
return nil, nil, err
}
var perm = auth.ReadPerm
var typ = sql.QueryProcess
switch parsed.(type) {
case *plan.CreateIndex:
typ = sql.CreateIndexProcess
perm = auth.ReadPerm | auth.WritePerm
case *plan.InsertInto, *plan.DeleteFrom, *plan.Update, *plan.DropIndex, *plan.UnlockTables, *plan.LockTables:
perm = auth.ReadPerm | auth.WritePerm
}
err = e.Auth.Allowed(ctx, perm)
if err != nil {
return nil, nil, err
}
ctx, err = e.Catalog.AddProcess(ctx, typ, query)
defer func() {
if err != nil && ctx != nil {
e.Catalog.Done(ctx.Pid())
}
}()
if err != nil {
return nil, nil, err
}
analyzed, err = e.Analyzer.Analyze(ctx, parsed)
if err != nil {
return nil, nil, err
}
iter, err = analyzed.RowIter(ctx)
if err != nil {
return nil, nil, err
}
return analyzed.Schema(), iter, nil
}
// Async returns true if the query is async. If there are any errors with the
// query it returns false
func (e *Engine) Async(ctx *sql.Context, query string) bool {
parsed, err := parse.Parse(ctx, query)
if err != nil {
return false
}
asyncNode, ok := parsed.(sql.AsyncNode)
return ok && asyncNode.IsAsync()
}
// AddDatabase adds the given database to the catalog.
func (e *Engine) AddDatabase(db sql.Database) {
e.Catalog.AddDatabase(db)
}
// Init performs all the initialization requirements for the engine to work.
func (e *Engine) Init() error {
return e.Catalog.LoadIndexes(e.Catalog.AllDatabases())
}