Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ There are two ways of interacting with AliECS:
* [ECS2DCS2ECS mock server](/core/integration/README.md#ecs2dcs2ecs-mock-server)
* [DD Scheduler](/core/integration/README.md#dd-scheduler)
* [Kafka (legacy)](/core/integration/README.md#kafka-legacy)
* [LHC](/core/integration/README.md)
* [ODC](/core/integration/README.md#odc)
* [Test plugin](/core/integration/README.md#test-plugin)
* [Trigger](/core/integration/README.md#trigger)
Expand Down
5 changes: 5 additions & 0 deletions cmd/o2-aliecs-core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/AliceO2Group/Control/core/integration/dcs"
"github.com/AliceO2Group/Control/core/integration/ddsched"
"github.com/AliceO2Group/Control/core/integration/kafka"
"github.com/AliceO2Group/Control/core/integration/lhc"
"github.com/AliceO2Group/Control/core/integration/odc"
"github.com/AliceO2Group/Control/core/integration/testplugin"
"github.com/AliceO2Group/Control/core/integration/trg"
Expand Down Expand Up @@ -64,6 +65,10 @@ func init() {
"kafka",
"kafkaEndpoint",
kafka.NewPlugin)
integration.RegisterPlugin(
"lhc",
"kafkaEndpoints",
lhc.NewPlugin)
integration.RegisterPlugin(
"odc",
"odcEndpoint",
Expand Down
132 changes: 132 additions & 0 deletions common/event/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Piotr Konopka <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package event

import (
"context"
"fmt"
"github.com/AliceO2Group/Control/common/event/topic"
"github.com/AliceO2Group/Control/common/logger/infologger"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/segmentio/kafka-go"
"github.com/spf13/viper"
"google.golang.org/protobuf/proto"
"sync"
)

// Reader interface provides methods to read events.
type Reader interface {
Next(ctx context.Context) (*pb.Event, error)
Close() error
}

// DummyReader is an implementation of Reader that returns no events.
type DummyReader struct{}

// Next returns the next event or nil if there are no more events.
func (*DummyReader) Next(context.Context) (*pb.Event, error) { return nil, nil }

// Close closes the DummyReader.
func (*DummyReader) Close() error { return nil }

// KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events.
// Consumption mode is chosen at creation time:
// - latestOnly=false: consume everything (from stored offsets or beginning depending on group state)
// - latestOnly=true: seek to latest offsets on start and only receive messages produced after start
type KafkaReader struct {
*kafka.Reader
mu sync.Mutex
topic string
}

// NewReaderWithTopic creates a KafkaReader for the provided topic and starts it.
// If latestOnly is true the reader attempts to seek to the latest offsets on start so that
// only new messages (produced after creation) are consumed.
func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader {
cfg := kafka.ReaderConfig{
Brokers: viper.GetStringSlice("kafkaEndpoints"),
Topic: string(topic),
GroupID: groupID,
MinBytes: 1,
MaxBytes: 10e7,
}

rk := &KafkaReader{
Reader: kafka.NewReader(cfg),
topic: string(topic),
}

if latestOnly {
// best-effort: set offset to last so we don't replay older messages
if err := rk.SetOffset(kafka.LastOffset); err != nil {
log.WithField(infologger.Level, infologger.IL_Devel).
Warnf("failed to set offset to last offset: %v", err)
}
}

return rk
}

// Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed
// (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx.
func (r *KafkaReader) Next(ctx context.Context) (*pb.Event, error) {
if r == nil {
return nil, fmt.Errorf("nil reader")
}

msg, err := r.ReadMessage(ctx)
if err != nil {
return nil, err
}

event, err := kafkaMessageToEvent(msg)
if err != nil {
return nil, err
}

return event, nil
}

// Close stops the reader.
func (r *KafkaReader) Close() error {
if r == nil {
return nil
}
// Close the underlying kafka reader which will cause ReadMessage to return an error
err := r.Reader.Close()
if err != nil {
log.WithField(infologger.Level, infologger.IL_Devel).
Errorf("failed to close kafka reader: %v", err)
}
return err
}

func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) {
var evt pb.Event
if err := proto.Unmarshal(m.Value, &evt); err != nil {
return nil, fmt.Errorf("failed to unmarshal kafka message: %w", err)
}
return &evt, nil
}
48 changes: 48 additions & 0 deletions common/event/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Piotr Konopka <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package event

import (
pb "github.com/AliceO2Group/Control/common/protos"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/segmentio/kafka-go"
"google.golang.org/protobuf/proto"
)

var _ = Describe("Reader", func() {
When("converting kafka message to event", func() {
It("unmarshals protobuf payload correctly", func() {
e := &pb.Event{Payload: &pb.Event_CoreStartEvent{CoreStartEvent: &pb.Ev_MetaEvent_CoreStart{FrameworkId: "z"}}}
b, err := proto.Marshal(e)
Expect(err).To(BeNil())

m := kafka.Message{Value: b}
evt, err := kafkaMessageToEvent(m)
Expect(err).To(BeNil())
Expect(evt.GetCoreStartEvent().FrameworkId).To(Equal("z"))
})
})
})
61 changes: 61 additions & 0 deletions core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/AliceO2Group/Control/common/system"
"github.com/AliceO2Group/Control/common/utils"
"github.com/AliceO2Group/Control/common/utils/uid"
lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event"
event2 "github.com/AliceO2Group/Control/core/integration/odc/event"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/sm"
Expand Down Expand Up @@ -1065,6 +1066,66 @@ func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEve
}
}
}
} else if evt.GetServiceName() == "LHC" {
envs.handleLhcEvents(evt)
}
}

func (envs *Manager) handleLhcEvents(evt event.IntegratedServiceEvent) {

lhcEvent, ok := evt.(*lhcevent.LhcStateChangeEvent)
if !ok {
return
}

// stop all relevant environments when beams are dumped
beamMode := lhcEvent.GetBeamInfo().BeamMode
beamsDumped := beamMode == evpb.BeamMode_BEAM_DUMP || beamMode == evpb.BeamMode_LOST_BEAMS || beamMode == evpb.BeamMode_NO_BEAM
if !beamsDumped {
return
}

for envId, env := range envs.m {
shouldStopAtBeamDump, _ := strconv.ParseBool(env.GetKV("", "stop_at_beam_dump"))
if shouldStopAtBeamDump && env.CurrentState() == "RUNNING" {
if currentTransition := env.CurrentTransition(); currentTransition != "" {
log.WithPrefix("scheduler").
WithField(infologger.Level, infologger.IL_Support).
WithField("partition", envId.String()).
WithField("run", env.currentRunNumber).
Infof("run was supposed to be stopped at beam dump, but transition '%s' is already in progress, skipping (probably the operator was faster)", currentTransition)
continue
}

go func(env *Environment) {
log.WithPrefix("scheduler").
WithField(infologger.Level, infologger.IL_Ops).
WithField("partition", envId.String()).
WithField("run", env.currentRunNumber).
Info("stopping the run due to beam dump")

err := env.TryTransition(NewStopActivityTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("run", env.currentRunNumber).
WithError(err).
Error("could not stop the run upon beam dump")

if env.CurrentState() != "ERROR" {
err = env.TryTransition(NewGoErrorTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("run", env.currentRunNumber).
WithError(err).
Error("environment GO_ERROR transition failed after a beam dump event, forcing")
env.setState("ERROR")
}
}
}
}(env)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ DD scheduler plugin informs the Data Distribution software about the pool of FLP

See [Legacy events: Kafka plugin](/docs/kafka.md#legacy-events-kafka-plugin)

# LHC plugin

This plugin listens to Kafka messages coming from the LHC DIP Client and pushes any relevant internal notifications to the AliECS core.
Its main purpose is to provide basic information about ongoing LHC activity (e.g. fill information) to affected parties and allow AliECS to react upon them (e.g. by automatically stopping a physics run when stable beams are over).

## ODC

ODC plugin communicates with the [Online Device Control (ODC)](https://github.com/FairRootGroup/ODC) instance of the ALICE experiment, which controls the event processing farm used in data taking and offline processing.
Expand Down
56 changes: 56 additions & 0 deletions core/integration/lhc/event/lhcevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Piotr Konopka <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package event

import (
"github.com/AliceO2Group/Control/common/event"
commonpb "github.com/AliceO2Group/Control/common/protos"
)

// BeamInfo mirrors (a subset of) the information described in the proto draft.
type BeamInfo struct {
StableBeamsStart int64 `json:"stableBeamsStart,omitempty"`
StableBeamsEnd int64 `json:"stableBeamsEnd,omitempty"`
FillNumber int32 `json:"fillNumber,omitempty"`
FillingSchemeName string `json:"fillingSchemeName,omitempty"`
BeamType string `json:"beamType,omitempty"`
BeamMode commonpb.BeamMode `json:"beamMode,omitempty"`
}

type LhcStateChangeEvent struct {
event.IntegratedServiceEventBase
BeamInfo BeamInfo
}

func (e *LhcStateChangeEvent) GetName() string {
return "LHC_STATE_CHANGE_EVENT"
}

func (e *LhcStateChangeEvent) GetBeamInfo() BeamInfo {
if e == nil {
return BeamInfo{}
}
return e.BeamInfo
}
Loading