Skip to content

feat(loader): enhance single active backend by treating as singleton #5107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Application struct {
func newApplication(appConfig *config.ApplicationConfig) *Application {
return &Application{
backendLoader: config.NewBackendConfigLoader(appConfig.ModelPath),
modelLoader: model.NewModelLoader(appConfig.ModelPath),
modelLoader: model.NewModelLoader(appConfig.ModelPath, appConfig.SingleBackend),
applicationConfig: appConfig,
templatesEvaluator: templates.NewEvaluator(appConfig.ModelPath),
}
Expand Down
2 changes: 1 addition & 1 deletion core/application/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func New(opts ...config.AppOption) (*Application, error) {
}()
}

if options.LoadToMemory != nil {
if options.LoadToMemory != nil && !options.SingleBackend {
for _, m := range options.LoadToMemory {
cfg, err := application.BackendLoader().LoadBackendConfigFileByNameDefaultOptions(m, options)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/backend/embeddings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, backendCo
if err != nil {
return nil, err
}
defer loader.Close()

var fn func() ([]float32, error)
switch model := inferenceModel.(type) {
Expand Down
1 change: 1 addition & 0 deletions core/backend/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func ImageGeneration(height, width, mode, step, seed int, positive_prompt, negat
if err != nil {
return nil, err
}
defer loader.Close()

fn := func() error {
_, err := inferenceModel.GenerateImage(
Expand Down
1 change: 1 addition & 0 deletions core/backend/llm.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func ModelInference(ctx context.Context, s string, messages []schema.Message, im
if err != nil {
return nil, err
}
defer loader.Close()

var protoMessages []*proto.Message
// if we are using the tokenizer template, we need to convert the messages to proto messages
Expand Down
60 changes: 28 additions & 32 deletions core/backend/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ func ModelOptions(c config.BackendConfig, so *config.ApplicationConfig, opts ...
grpcOpts := grpcModelOpts(c)
defOpts = append(defOpts, model.WithLoadGRPCLoadModelOpts(grpcOpts))

if so.SingleBackend {
defOpts = append(defOpts, model.WithSingleActiveBackend())
}

if so.ParallelBackendRequests {
defOpts = append(defOpts, model.EnableParallelRequests)
}
Expand Down Expand Up @@ -121,7 +117,7 @@ func grpcModelOpts(c config.BackendConfig) *pb.ModelOptions {
triggers := make([]*pb.GrammarTrigger, 0)
for _, t := range c.FunctionsConfig.GrammarConfig.GrammarTriggers {
triggers = append(triggers, &pb.GrammarTrigger{
Word: t.Word,
Word: t.Word,
})

}
Expand Down Expand Up @@ -161,33 +157,33 @@ func grpcModelOpts(c config.BackendConfig) *pb.ModelOptions {
DisableLogStatus: c.DisableLogStatus,
DType: c.DType,
// LimitMMPerPrompt vLLM
LimitImagePerPrompt: int32(c.LimitMMPerPrompt.LimitImagePerPrompt),
LimitVideoPerPrompt: int32(c.LimitMMPerPrompt.LimitVideoPerPrompt),
LimitAudioPerPrompt: int32(c.LimitMMPerPrompt.LimitAudioPerPrompt),
MMProj: c.MMProj,
FlashAttention: c.FlashAttention,
CacheTypeKey: c.CacheTypeK,
CacheTypeValue: c.CacheTypeV,
NoKVOffload: c.NoKVOffloading,
YarnExtFactor: c.YarnExtFactor,
YarnAttnFactor: c.YarnAttnFactor,
YarnBetaFast: c.YarnBetaFast,
YarnBetaSlow: c.YarnBetaSlow,
NGQA: c.NGQA,
RMSNormEps: c.RMSNormEps,
MLock: mmlock,
RopeFreqBase: c.RopeFreqBase,
RopeScaling: c.RopeScaling,
Type: c.ModelType,
RopeFreqScale: c.RopeFreqScale,
NUMA: c.NUMA,
Embeddings: embeddings,
LowVRAM: lowVRAM,
NGPULayers: int32(nGPULayers),
MMap: mmap,
MainGPU: c.MainGPU,
Threads: int32(*c.Threads),
TensorSplit: c.TensorSplit,
LimitImagePerPrompt: int32(c.LimitMMPerPrompt.LimitImagePerPrompt),
LimitVideoPerPrompt: int32(c.LimitMMPerPrompt.LimitVideoPerPrompt),
LimitAudioPerPrompt: int32(c.LimitMMPerPrompt.LimitAudioPerPrompt),
MMProj: c.MMProj,
FlashAttention: c.FlashAttention,
CacheTypeKey: c.CacheTypeK,
CacheTypeValue: c.CacheTypeV,
NoKVOffload: c.NoKVOffloading,
YarnExtFactor: c.YarnExtFactor,
YarnAttnFactor: c.YarnAttnFactor,
YarnBetaFast: c.YarnBetaFast,
YarnBetaSlow: c.YarnBetaSlow,
NGQA: c.NGQA,
RMSNormEps: c.RMSNormEps,
MLock: mmlock,
RopeFreqBase: c.RopeFreqBase,
RopeScaling: c.RopeScaling,
Type: c.ModelType,
RopeFreqScale: c.RopeFreqScale,
NUMA: c.NUMA,
Embeddings: embeddings,
LowVRAM: lowVRAM,
NGPULayers: int32(nGPULayers),
MMap: mmap,
MainGPU: c.MainGPU,
Threads: int32(*c.Threads),
TensorSplit: c.TensorSplit,
// AutoGPTQ
ModelBaseName: c.AutoGPTQ.ModelBaseName,
Device: c.AutoGPTQ.Device,
Expand Down
2 changes: 1 addition & 1 deletion core/backend/rerank.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
func Rerank(request *proto.RerankRequest, loader *model.ModelLoader, appConfig *config.ApplicationConfig, backendConfig config.BackendConfig) (*proto.RerankResult, error) {
opts := ModelOptions(backendConfig, appConfig)
rerankModel, err := loader.Load(opts...)

if err != nil {
return nil, err
}
defer loader.Close()

if rerankModel == nil {
return nil, fmt.Errorf("could not load rerank model")
Expand Down
2 changes: 1 addition & 1 deletion core/backend/soundgeneration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func SoundGeneration(

opts := ModelOptions(backendConfig, appConfig)
soundGenModel, err := loader.Load(opts...)

if err != nil {
return "", nil, err
}
defer loader.Close()

if soundGenModel == nil {
return "", nil, fmt.Errorf("could not load sound generation model")
Expand Down
1 change: 1 addition & 0 deletions core/backend/token_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TokenMetrics(
if err != nil {
return nil, err
}
defer loader.Close()

if model == nil {
return nil, fmt.Errorf("could not loadmodel model")
Expand Down
2 changes: 1 addition & 1 deletion core/backend/tokenize.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ func ModelTokenize(s string, loader *model.ModelLoader, backendConfig config.Bac

opts := ModelOptions(backendConfig, appConfig)
inferenceModel, err = loader.Load(opts...)

if err != nil {
return schema.TokenizeResponse{}, err
}
defer loader.Close()

predictOptions := gRPCPredictOpts(backendConfig, loader.ModelPath)
predictOptions.Prompt = s
Expand Down
1 change: 1 addition & 0 deletions core/backend/transcript.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func ModelTranscription(audio, language string, translate bool, ml *model.ModelL
if err != nil {
return nil, err
}
defer ml.Close()

if transcriptionModel == nil {
return nil, fmt.Errorf("could not load transcription model")
Expand Down
2 changes: 1 addition & 1 deletion core/backend/tts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ func ModelTTS(
) (string, *proto.Result, error) {
opts := ModelOptions(backendConfig, appConfig, model.WithDefaultBackendString(model.PiperBackend))
ttsModel, err := loader.Load(opts...)

if err != nil {
return "", nil, err
}
defer loader.Close()

if ttsModel == nil {
return "", nil, fmt.Errorf("could not load tts model %q", backendConfig.Model)
Expand Down
2 changes: 2 additions & 0 deletions core/backend/vad.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func VAD(request *schema.VADRequest,
if err != nil {
return nil, err
}
defer ml.Close()

req := proto.VADRequest{
Audio: request.Audio,
}
Expand Down
2 changes: 1 addition & 1 deletion core/cli/soundgeneration.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (t *SoundGenerationCMD) Run(ctx *cliContext.Context) error {
AssetsDestination: t.BackendAssetsPath,
ExternalGRPCBackends: externalBackends,
}
ml := model.NewModelLoader(opts.ModelPath)
ml := model.NewModelLoader(opts.ModelPath, opts.SingleBackend)

defer func() {
err := ml.StopAllGRPC()
Expand Down
2 changes: 1 addition & 1 deletion core/cli/transcript.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (t *TranscriptCMD) Run(ctx *cliContext.Context) error {
}

cl := config.NewBackendConfigLoader(t.ModelsPath)
ml := model.NewModelLoader(opts.ModelPath)
ml := model.NewModelLoader(opts.ModelPath, opts.SingleBackend)
if err := cl.LoadBackendConfigsFromPath(t.ModelsPath); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/cli/tts.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (t *TTSCMD) Run(ctx *cliContext.Context) error {
AudioDir: outputDir,
AssetsDestination: t.BackendAssetsPath,
}
ml := model.NewModelLoader(opts.ModelPath)
ml := model.NewModelLoader(opts.ModelPath, opts.SingleBackend)

defer func() {
err := ml.StopAllGRPC()
Expand Down
4 changes: 4 additions & 0 deletions core/http/endpoints/localai/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func StoresSetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi
if err != nil {
return err
}
defer sl.Close()

vals := make([][]byte, len(input.Values))
for i, v := range input.Values {
Expand Down Expand Up @@ -48,6 +49,7 @@ func StoresDeleteEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationCo
if err != nil {
return err
}
defer sl.Close()

if err := store.DeleteCols(c.Context(), sb, input.Keys); err != nil {
return err
Expand All @@ -69,6 +71,7 @@ func StoresGetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi
if err != nil {
return err
}
defer sl.Close()

keys, vals, err := store.GetCols(c.Context(), sb, input.Keys)
if err != nil {
Expand Down Expand Up @@ -100,6 +103,7 @@ func StoresFindEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConf
if err != nil {
return err
}
defer sl.Close()

keys, vals, similarities, err := store.Find(c.Context(), sb, input.Key, input.Topk)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/http/endpoints/openai/assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestAssistantEndpoints(t *testing.T) {
cl := &config.BackendConfigLoader{}
//configsDir := "/tmp/localai/configs"
modelPath := "/tmp/localai/model"
var ml = model.NewModelLoader(modelPath)
var ml = model.NewModelLoader(modelPath, false)

appConfig := &config.ApplicationConfig{
ConfigsDir: configsDir,
Expand Down
9 changes: 4 additions & 5 deletions core/http/routes/localai.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ func RegisterLocalAIRoutes(router *fiber.App,
router.Post("/v1/vad", vadChain...)

// Stores
sl := model.NewModelLoader("")
router.Post("/stores/set", localai.StoresSetEndpoint(sl, appConfig))
router.Post("/stores/delete", localai.StoresDeleteEndpoint(sl, appConfig))
router.Post("/stores/get", localai.StoresGetEndpoint(sl, appConfig))
router.Post("/stores/find", localai.StoresFindEndpoint(sl, appConfig))
router.Post("/stores/set", localai.StoresSetEndpoint(ml, appConfig))
router.Post("/stores/delete", localai.StoresDeleteEndpoint(ml, appConfig))
router.Post("/stores/get", localai.StoresGetEndpoint(ml, appConfig))
router.Post("/stores/find", localai.StoresFindEndpoint(ml, appConfig))

if !appConfig.DisableMetrics {
router.Get("/metrics", localai.LocalAIMetricsEndpoint())
Expand Down
21 changes: 20 additions & 1 deletion pkg/model/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,23 @@ func (ml *ModelLoader) stopActiveBackends(modelID string, singleActiveBackend bo
}
}

func (ml *ModelLoader) Close() {
if !ml.singletonMode {
return
}
ml.singletonLock.Unlock()
}

func (ml *ModelLoader) lockBackend() {
if !ml.singletonMode {
return
}
ml.singletonLock.Lock()
}

func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
ml.lockBackend() // grab the singleton lock if needed

o := NewOptions(opts...)

// Return earlier if we have a model already loaded
Expand All @@ -520,7 +536,7 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
return m.GRPC(o.parallelRequests, ml.wd), nil
}

ml.stopActiveBackends(o.modelID, o.singleActiveBackend)
ml.stopActiveBackends(o.modelID, ml.singletonMode)

// if a backend is defined, return the loader directly
if o.backendString != "" {
Expand All @@ -533,6 +549,7 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
// get backends embedded in the binary
autoLoadBackends, err := ml.ListAvailableBackends(o.assetDir)
if err != nil {
ml.Close() // we failed, release the lock
return nil, err
}

Expand Down Expand Up @@ -564,5 +581,7 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
}
}

ml.Close() // make sure to release the lock in case of failure

return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
}
17 changes: 10 additions & 7 deletions pkg/model/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ import (

// TODO: Split ModelLoader and TemplateLoader? Just to keep things more organized. Left together to share a mutex until I look into that. Would split if we seperate directories for .bin/.yaml and .tmpl
type ModelLoader struct {
ModelPath string
mu sync.Mutex
models map[string]*Model
wd *WatchDog
ModelPath string
mu sync.Mutex
singletonLock sync.Mutex
singletonMode bool
models map[string]*Model
wd *WatchDog
}

func NewModelLoader(modelPath string) *ModelLoader {
func NewModelLoader(modelPath string, singleActiveBackend bool) *ModelLoader {
nml := &ModelLoader{
ModelPath: modelPath,
models: make(map[string]*Model),
ModelPath: modelPath,
models: make(map[string]*Model),
singletonMode: singleActiveBackend,
}

return nml
Expand Down
13 changes: 3 additions & 10 deletions pkg/model/loader_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ type Options struct {

externalBackends map[string]string

grpcAttempts int
grpcAttemptsDelay int
singleActiveBackend bool
parallelRequests bool
grpcAttempts int
grpcAttemptsDelay int
parallelRequests bool
}

type Option func(*Options)
Expand Down Expand Up @@ -88,12 +87,6 @@ func WithContext(ctx context.Context) Option {
}
}

func WithSingleActiveBackend() Option {
return func(o *Options) {
o.singleActiveBackend = true
}
}

func WithModelID(id string) Option {
return func(o *Options) {
o.modelID = id
Expand Down
2 changes: 1 addition & 1 deletion pkg/model/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var _ = Describe("ModelLoader", func() {
// Setup the model loader with a test directory
modelPath = "/tmp/test_model_path"
os.Mkdir(modelPath, 0755)
modelLoader = model.NewModelLoader(modelPath)
modelLoader = model.NewModelLoader(modelPath, false)
})

AfterEach(func() {
Expand Down
Loading
Loading