diff --git a/ecs-logging-core/src/main/java/co/elastic/logging/DataStreamFieldSanitizer.java b/ecs-logging-core/src/main/java/co/elastic/logging/DataStreamFieldSanitizer.java
new file mode 100644
index 00000000..950ecac0
--- /dev/null
+++ b/ecs-logging-core/src/main/java/co/elastic/logging/DataStreamFieldSanitizer.java
@@ -0,0 +1,59 @@
+/*-
+ * #%L
+ * Java ECS logging
+ * %%
+ * Copyright (C) 2019 - 2021 Elastic and contributors
+ * %%
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * #L%
+ */
+package co.elastic.logging;
+
+import java.util.Locale;
+
+/**
+ * Based on https://github.com/elastic/ecs/blob/master/rfcs/text/0009-data_stream-fields.md#restrictions-on-values and
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
+ */
+public class DataStreamFieldSanitizer {
+
+    private static final char[] DISALLOWED_IN_DATASET = new char[]{'\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':', '-'};
+    private static final char[] DISALLOWED_IN_NAMESPACE = new char[]{'\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':'};
+    private static final int MAX_LENGTH = 100;
+    private static final char REPLACEMENT_CHAR = '_';
+
+    public static String sanitizeDataStreamDataset(String dataset) {
+        return sanitazeDataStreamField(dataset, DISALLOWED_IN_DATASET);
+    }
+
+    public static String sanitizeDataStreamNamespace(String dataStreamNamespace) {
+        return sanitazeDataStreamField(dataStreamNamespace, DISALLOWED_IN_NAMESPACE);
+    }
+
+    private static String sanitazeDataStreamField(String dataset, char[] disallowedInDataset) {
+        if (dataset == null || dataset.isEmpty()) {
+            return dataset;
+        }
+        dataset = dataset.toLowerCase(Locale.ROOT)
+                .substring(0, Math.min(dataset.length(), MAX_LENGTH));
+        for (char c : disallowedInDataset) {
+            dataset = dataset.replace(c, REPLACEMENT_CHAR);
+        }
+        return dataset;
+    }
+}
diff --git a/ecs-logging-core/src/main/java/co/elastic/logging/EcsJsonSerializer.java b/ecs-logging-core/src/main/java/co/elastic/logging/EcsJsonSerializer.java
index 20312b38..1e21368f 100644
--- a/ecs-logging-core/src/main/java/co/elastic/logging/EcsJsonSerializer.java
+++ b/ecs-logging-core/src/main/java/co/elastic/logging/EcsJsonSerializer.java
@@ -91,9 +91,16 @@ public static void serializeServiceName(StringBuilder builder, String serviceNam
         }
     }
 
-    public static void serializeEventDataset(StringBuilder builder, String eventDataset) {
+    public static void serializeDataset(StringBuilder builder, String eventDataset) {
         if (eventDataset != null) {
             builder.append("\"event.dataset\":\"").append(eventDataset).append("\",");
+            builder.append("\"data_stream.dataset\":\"").append(eventDataset).append("\",");
+        }
+    }
+
+    public static void serializeNamespace(StringBuilder builder, String namespace) {
+        if (namespace != null) {
+            builder.append("\"data_stream.namespace\":\"").append(namespace).append("\",");
         }
     }
 
@@ -276,11 +283,11 @@ public static StringBuilder getMessageStringBuilder() {
         return result;
     }
 
-    public static String computeEventDataset(String eventDataset, String serviceName) {
-        if (eventDataset == null && serviceName != null && !serviceName.isEmpty()) {
+    public static String computeDataset(String dataset, String serviceName) {
+        if (dataset == null && serviceName != null && !serviceName.isEmpty()) {
             return serviceName + ".log";
         }
-        return eventDataset;
+        return DataStreamFieldSanitizer.sanitizeDataStreamDataset(dataset);
     }
 
     public static void serializeAdditionalFields(StringBuilder builder, List<AdditionalField> additionalFields) {
diff --git a/ecs-logging-core/src/test/java/co/elastic/logging/AbstractEcsLoggingTest.java b/ecs-logging-core/src/test/java/co/elastic/logging/AbstractEcsLoggingTest.java
index b5b53c86..6cc0cf0c 100644
--- a/ecs-logging-core/src/test/java/co/elastic/logging/AbstractEcsLoggingTest.java
+++ b/ecs-logging-core/src/test/java/co/elastic/logging/AbstractEcsLoggingTest.java
@@ -63,6 +63,8 @@ void testMetadata() throws Exception {
         assertThat(getAndValidateLastLogLine().get("log.level").textValue()).isIn("DEBUG", "FINE");
         assertThat(getAndValidateLastLogLine().get("log.logger")).isNotNull();
         assertThat(getAndValidateLastLogLine().get("event.dataset").textValue()).isEqualTo("testdataset.log");
+        assertThat(getAndValidateLastLogLine().get("data_stream.dataset").textValue()).isEqualTo("testdataset.log");
+        assertThat(getAndValidateLastLogLine().get("data_stream.namespace").textValue()).isEqualTo("custom_namespace");
         assertThat(getAndValidateLastLogLine().get("ecs.version").textValue()).isEqualTo("1.2.0");
         validateLog(getAndValidateLastLogLine());
     }
diff --git a/jboss-logmanager-ecs-formatter/src/main/java/co/elastic/logging/jboss/logmanager/EcsFormatter.java b/jboss-logmanager-ecs-formatter/src/main/java/co/elastic/logging/jboss/logmanager/EcsFormatter.java
index 34f99e35..d72c5abf 100644
--- a/jboss-logmanager-ecs-formatter/src/main/java/co/elastic/logging/jboss/logmanager/EcsFormatter.java
+++ b/jboss-logmanager-ecs-formatter/src/main/java/co/elastic/logging/jboss/logmanager/EcsFormatter.java
@@ -24,28 +24,35 @@
  */
 package co.elastic.logging.jboss.logmanager;
 
+import co.elastic.logging.DataStreamFieldSanitizer;
 import co.elastic.logging.EcsJsonSerializer;
 import co.elastic.logging.AdditionalField;
 import org.jboss.logmanager.ExtFormatter;
 import org.jboss.logmanager.ExtLogRecord;
 import org.jboss.logmanager.LogManager;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 public class EcsFormatter extends ExtFormatter {
 
     private String serviceName;
-    private String eventDataset;
+    private String dataset;
+    private String dataStreamNamespace;
     private List<AdditionalField> additionalFields = Collections.emptyList();
     private boolean includeOrigin;
     private boolean stackTraceAsArray;
 
     public EcsFormatter() {
-        serviceName = getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.serviceName", null);
-        eventDataset = getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.eventDataset", null);
-        eventDataset = EcsJsonSerializer.computeEventDataset(eventDataset, serviceName);
+        setServiceName(getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.serviceName", null));
+        String dataset = getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.dataStreamDataset", null);
+        if (dataset == null) {
+            dataset = getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.eventDataset", null);
+        }
+        setDataset(EcsJsonSerializer.computeDataset(dataset, serviceName));
+
+        setDataStreamNamespace(getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.dataStreamNamespace", null));
+
         includeOrigin = Boolean.getBoolean(getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.includeOrigin", "false"));
         stackTraceAsArray = Boolean.getBoolean(getProperty("co.elastic.logging.jboss.logmanager.EcsFormatter.stackTraceAsArray", "false"));
     }
@@ -58,7 +65,8 @@ public String format(ExtLogRecord record) {
         EcsJsonSerializer.serializeFormattedMessage(builder, record.getFormattedMessage());
         EcsJsonSerializer.serializeEcsVersion(builder);
         EcsJsonSerializer.serializeServiceName(builder, serviceName);
-        EcsJsonSerializer.serializeEventDataset(builder, eventDataset);
+        EcsJsonSerializer.serializeDataset(builder, dataset);
+        EcsJsonSerializer.serializeNamespace(builder, dataStreamNamespace);
         EcsJsonSerializer.serializeThreadName(builder, record.getThreadName());
         EcsJsonSerializer.serializeLoggerName(builder, record.getLoggerName());
         EcsJsonSerializer.serializeAdditionalFields(builder, additionalFields);
@@ -88,7 +96,7 @@ public void setIncludeOrigin(final boolean includeOrigin) {
 
     public void setServiceName(final String serviceName) {
         this.serviceName = serviceName;
-        eventDataset = EcsJsonSerializer.computeEventDataset(eventDataset, serviceName);
+        setDataset(EcsJsonSerializer.computeDataset(dataset, serviceName));
     }
 
     public void setStackTraceAsArray(final boolean stackTraceAsArray) {
@@ -96,7 +104,21 @@ public void setStackTraceAsArray(final boolean stackTraceAsArray) {
     }
 
     public void setEventDataset(String eventDataset) {
-        this.eventDataset = eventDataset;
+        setDataset(eventDataset);
+    }
+
+    public void setDataStreamDataset(String eventDataset) {
+        setDataset(eventDataset);
+    }
+
+    private void setDataset(String dataset) {
+        if (dataset != null) {
+            this.dataset = DataStreamFieldSanitizer.sanitizeDataStreamDataset(dataset);
+        }
+    }
+
+    public void setDataStreamNamespace(String dataStreamNamespace) {
+        this.dataStreamNamespace = DataStreamFieldSanitizer.sanitizeDataStreamNamespace(dataStreamNamespace);
     }
 
     public void setAdditionalFields(String additionalFields) {
diff --git a/jboss-logmanager-ecs-formatter/src/test/java/co/elastic/logging/jboss/logmanager/JBossLogManagerTest.java b/jboss-logmanager-ecs-formatter/src/test/java/co/elastic/logging/jboss/logmanager/JBossLogManagerTest.java
index 0350fc17..7201d07c 100644
--- a/jboss-logmanager-ecs-formatter/src/test/java/co/elastic/logging/jboss/logmanager/JBossLogManagerTest.java
+++ b/jboss-logmanager-ecs-formatter/src/test/java/co/elastic/logging/jboss/logmanager/JBossLogManagerTest.java
@@ -90,6 +90,7 @@ void setUp() {
         formatter.setIncludeOrigin(true);
         formatter.setServiceName("test");
         formatter.setEventDataset("testdataset.log");
+        formatter.setDataStreamNamespace("custom*namespace");
         formatter.setAdditionalFields("key1=value1,key2=value2");
 
         logger.setLevel(Level.ALL);
diff --git a/jul-ecs-formatter/src/main/java/co/elastic/logging/jul/EcsFormatter.java b/jul-ecs-formatter/src/main/java/co/elastic/logging/jul/EcsFormatter.java
index 73e8cb7e..09e80e41 100644
--- a/jul-ecs-formatter/src/main/java/co/elastic/logging/jul/EcsFormatter.java
+++ b/jul-ecs-formatter/src/main/java/co/elastic/logging/jul/EcsFormatter.java
@@ -31,6 +31,7 @@
 import java.util.logging.LogRecord;
 
 import co.elastic.logging.AdditionalField;
+import co.elastic.logging.DataStreamFieldSanitizer;
 import co.elastic.logging.EcsJsonSerializer;
 
 public class EcsFormatter extends Formatter {
@@ -41,19 +42,24 @@ public class EcsFormatter extends Formatter {
     private boolean stackTraceAsArray;
     private String serviceName;
     private boolean includeOrigin;
-    private String eventDataset;
+    private String dataset;
+    private String dataStreamNamespace;
     private List<AdditionalField> additionalFields = Collections.emptyList();
 
     /**
      * Default constructor. Will read configuration from LogManager properties.
      */
     public EcsFormatter() {
-        serviceName = getProperty("co.elastic.logging.jul.EcsFormatter.serviceName", null);
+        setServiceName(getProperty("co.elastic.logging.jul.EcsFormatter.serviceName", null));
         includeOrigin = Boolean.getBoolean(getProperty("co.elastic.logging.jul.EcsFormatter.includeOrigin", "false"));
         stackTraceAsArray = Boolean
                 .getBoolean(getProperty("co.elastic.logging.jul.EcsFormatter.stackTraceAsArray", "false"));
-        eventDataset = getProperty("co.elastic.logging.jul.EcsFormatter.eventDataset", null);
-        eventDataset = EcsJsonSerializer.computeEventDataset(eventDataset, serviceName);
+        String dataset = getProperty("co.elastic.logging.jul.EcsFormatter.dataStreamDataset", null);
+        if (dataset == null) {
+            dataset = getProperty("co.elastic.logging.jul.EcsFormatter.eventDataset", null);
+        }
+        setDataset(dataset);
+        setDataStreamNamespace(getProperty("co.elastic.logging.jul.EcsFormatter.dataStreamNamespace", null));
     }
 
     @Override
@@ -66,7 +72,8 @@ public String format(final LogRecord record) {
         EcsJsonSerializer.serializeAdditionalFields(builder, additionalFields);
         EcsJsonSerializer.serializeMDC(builder, mdcSupplier.getMDC());
         EcsJsonSerializer.serializeServiceName(builder, serviceName);
-        EcsJsonSerializer.serializeEventDataset(builder, eventDataset);
+        EcsJsonSerializer.serializeDataset(builder, dataset);
+        EcsJsonSerializer.serializeNamespace(builder, dataStreamNamespace);
         if (Thread.currentThread().getId() == record.getThreadID()) {
             EcsJsonSerializer.serializeThreadName(builder, Thread.currentThread().getName());
         } else {
@@ -90,6 +97,7 @@ protected void setIncludeOrigin(final boolean includeOrigin) {
 
     protected void setServiceName(final String serviceName) {
         this.serviceName = serviceName;
+        setDataset(EcsJsonSerializer.computeDataset(dataset, serviceName));
     }
 
     protected void setStackTraceAsArray(final boolean stackTraceAsArray) {
@@ -97,7 +105,21 @@ protected void setStackTraceAsArray(final boolean stackTraceAsArray) {
     }
     
     public void setEventDataset(String eventDataset) {
-        this.eventDataset = eventDataset;
+        setDataset(eventDataset);
+    }
+
+    public void setDataStreamDataset(String eventDataset) {
+        setDataset(eventDataset);
+    }
+
+    private void setDataset(String dataset) {
+        if (dataset != null) {
+            this.dataset = DataStreamFieldSanitizer.sanitizeDataStreamDataset(dataset);
+        }
+    }
+
+    public void setDataStreamNamespace(String dataStreamNamespace) {
+        this.dataStreamNamespace = DataStreamFieldSanitizer.sanitizeDataStreamNamespace(dataStreamNamespace);
     }
 
     public void setAdditionalFields(String additionalFields) {
diff --git a/jul-ecs-formatter/src/test/java/co/elastic/logging/jul/JulLoggingTest.java b/jul-ecs-formatter/src/test/java/co/elastic/logging/jul/JulLoggingTest.java
index 50630fa1..9cfaecda 100644
--- a/jul-ecs-formatter/src/test/java/co/elastic/logging/jul/JulLoggingTest.java
+++ b/jul-ecs-formatter/src/test/java/co/elastic/logging/jul/JulLoggingTest.java
@@ -115,6 +115,7 @@ void setUp() {
         formatter.setIncludeOrigin(true);
         formatter.setServiceName("test");
         formatter.setEventDataset("testdataset.log");
+        formatter.setDataStreamNamespace("custom*namespace");
         formatter.setAdditionalFields("key1=value1,key2=value2");
         
         Handler handler = new InMemoryStreamHandler(out, formatter);
diff --git a/log4j-ecs-layout/src/main/java/co/elastic/logging/log4j/EcsLayout.java b/log4j-ecs-layout/src/main/java/co/elastic/logging/log4j/EcsLayout.java
index 6be4cb41..d60b6bbb 100644
--- a/log4j-ecs-layout/src/main/java/co/elastic/logging/log4j/EcsLayout.java
+++ b/log4j-ecs-layout/src/main/java/co/elastic/logging/log4j/EcsLayout.java
@@ -24,6 +24,7 @@
  */
 package co.elastic.logging.log4j;
 
+import co.elastic.logging.DataStreamFieldSanitizer;
 import co.elastic.logging.EcsJsonSerializer;
 import co.elastic.logging.AdditionalField;
 import org.apache.log4j.Layout;
@@ -41,7 +42,8 @@ public class EcsLayout extends Layout {
     private boolean stackTraceAsArray = false;
     private String serviceName;
     private boolean includeOrigin;
-    private String eventDataset;
+    private String dataset;
+    private String dataStreamNamespace;
     private List<AdditionalField> additionalFields = new ArrayList<AdditionalField>();
 
     @Override
@@ -52,7 +54,8 @@ public String format(LoggingEvent event) {
         EcsJsonSerializer.serializeFormattedMessage(builder, event.getRenderedMessage());
         EcsJsonSerializer.serializeEcsVersion(builder);
         EcsJsonSerializer.serializeServiceName(builder, serviceName);
-        EcsJsonSerializer.serializeEventDataset(builder, eventDataset);
+        EcsJsonSerializer.serializeDataset(builder, dataset);
+        EcsJsonSerializer.serializeNamespace(builder, dataStreamNamespace);
         EcsJsonSerializer.serializeThreadName(builder, event.getThreadName());
         EcsJsonSerializer.serializeLoggerName(builder, event.categoryName);
         EcsJsonSerializer.serializeAdditionalFields(builder, additionalFields);
@@ -92,7 +95,7 @@ public boolean ignoresThrowable() {
 
     @Override
     public void activateOptions() {
-        eventDataset = EcsJsonSerializer.computeEventDataset(eventDataset, serviceName);
+        setDataset(EcsJsonSerializer.computeDataset(dataset, serviceName));
     }
 
     public void setServiceName(String serviceName) {
@@ -107,8 +110,20 @@ public void setStackTraceAsArray(boolean stackTraceAsArray) {
         this.stackTraceAsArray = stackTraceAsArray;
     }
 
-    public void setEventDataset(String eventDataset) {
-        this.eventDataset = eventDataset;
+    public void setEventDataset(String dataset) {
+        setDataset(dataset);
+    }
+
+    public void setDataStreamDataset(String dataset) {
+        setDataset(dataset);
+    }
+
+    private void setDataset(String dataset) {
+        this.dataset = DataStreamFieldSanitizer.sanitizeDataStreamDataset(dataset);
+    }
+
+    public void setDataStreamNamespace(String dataStreamNamespace) {
+        this.dataStreamNamespace = DataStreamFieldSanitizer.sanitizeDataStreamNamespace(dataStreamNamespace);
     }
 
     public void setAdditionalField(String additionalField) {
diff --git a/log4j-ecs-layout/src/test/java/co/elastic/logging/log4j/Log4jEcsLayoutTest.java b/log4j-ecs-layout/src/test/java/co/elastic/logging/log4j/Log4jEcsLayoutTest.java
index 0796c5bd..23fce8a6 100644
--- a/log4j-ecs-layout/src/test/java/co/elastic/logging/log4j/Log4jEcsLayoutTest.java
+++ b/log4j-ecs-layout/src/test/java/co/elastic/logging/log4j/Log4jEcsLayoutTest.java
@@ -53,6 +53,7 @@ void setUp() {
         ecsLayout.setServiceName("test");
         ecsLayout.setIncludeOrigin(true);
         ecsLayout.setEventDataset("testdataset.log");
+        ecsLayout.setDataStreamNamespace("custom*namespace");
         ecsLayout.activateOptions();
         ecsLayout.setAdditionalField("key1=value1");
         ecsLayout.setAdditionalField("key2=value2");
diff --git a/log4j-ecs-layout/src/test/resources/log4j.xml b/log4j-ecs-layout/src/test/resources/log4j.xml
index b91729e4..8cd5a316 100644
--- a/log4j-ecs-layout/src/test/resources/log4j.xml
+++ b/log4j-ecs-layout/src/test/resources/log4j.xml
@@ -5,6 +5,7 @@
         <layout class="co.elastic.logging.log4j.EcsLayout">
             <param name="serviceName" value="test"/>
             <param name="eventDataset" value="testdataset.log"/>
+            <param name="dataStreamNamespace" value="custom*namespace"/>
             <param name="includeOrigin" value="true"/>
             <param name="additionalField" value="key1=value1"/>
             <param name="additionalField" value="key2=value2"/>
diff --git a/log4j2-ecs-layout/src/main/java/co/elastic/logging/log4j2/EcsLayout.java b/log4j2-ecs-layout/src/main/java/co/elastic/logging/log4j2/EcsLayout.java
index 36362554..beec6fdc 100644
--- a/log4j2-ecs-layout/src/main/java/co/elastic/logging/log4j2/EcsLayout.java
+++ b/log4j2-ecs-layout/src/main/java/co/elastic/logging/log4j2/EcsLayout.java
@@ -25,6 +25,7 @@
 package co.elastic.logging.log4j2;
 
 
+import co.elastic.logging.DataStreamFieldSanitizer;
 import co.elastic.logging.EcsJsonSerializer;
 import co.elastic.logging.JsonUtils;
 import org.apache.logging.log4j.Marker;
@@ -68,20 +69,22 @@ public class EcsLayout extends AbstractStringLayout {
     private final PatternFormatter[][] fieldValuePatternFormatter;
     private final boolean stackTraceAsArray;
     private final String serviceName;
-    private final String eventDataset;
+    private final String dataset;
+    private final String dataStreamNamespace;
     private final boolean includeMarkers;
     private final boolean includeOrigin;
     private final ConcurrentMap<Class<? extends MultiformatMessage>, Boolean> supportsJson = new ConcurrentHashMap<Class<? extends MultiformatMessage>, Boolean>();
 
-    private EcsLayout(Configuration config, String serviceName, String eventDataset, boolean includeMarkers, KeyValuePair[] additionalFields, boolean includeOrigin, boolean stackTraceAsArray) {
+    private EcsLayout(Configuration config, String serviceName, String dataset, boolean includeMarkers, KeyValuePair[] additionalFields, boolean includeOrigin, boolean stackTraceAsArray, String dataStreamNamespace) {
         super(config, UTF_8, null, null);
         this.serviceName = serviceName;
-        this.eventDataset = eventDataset;
+        this.dataset = dataset;
         this.includeMarkers = includeMarkers;
         this.includeOrigin = includeOrigin;
         this.stackTraceAsArray = stackTraceAsArray;
         this.additionalFields = additionalFields;
         fieldValuePatternFormatter = new PatternFormatter[additionalFields.length][];
+        this.dataStreamNamespace = dataStreamNamespace;
         for (int i = 0; i < additionalFields.length; i++) {
             KeyValuePair additionalField = additionalFields[i];
             if (additionalField.getValue().contains("%")) {
@@ -125,7 +128,8 @@ private StringBuilder toText(LogEvent event, StringBuilder builder, boolean gcFr
         serializeMessage(builder, gcFree, event.getMessage(), event.getThrown());
         EcsJsonSerializer.serializeEcsVersion(builder);
         EcsJsonSerializer.serializeServiceName(builder, serviceName);
-        EcsJsonSerializer.serializeEventDataset(builder, eventDataset);
+        EcsJsonSerializer.serializeDataset(builder, dataset);
+        EcsJsonSerializer.serializeNamespace(builder, dataStreamNamespace);
         EcsJsonSerializer.serializeThreadName(builder, event.getThreadName());
         EcsJsonSerializer.serializeLoggerName(builder, event.getLoggerName());
         serializeAdditionalFieldsAndMDC(event, builder);
@@ -326,6 +330,10 @@ public static class Builder implements org.apache.logging.log4j.core.util.Builde
         private String serviceName;
         @PluginBuilderAttribute("eventDataset")
         private String eventDataset;
+        @PluginBuilderAttribute("dataStreamDataset")
+        private String dataStreamDataset;
+        @PluginBuilderAttribute("dataStreamNamespace")
+        private String dataStreamNamespace;
         @PluginBuilderAttribute("includeMarkers")
         private boolean includeMarkers = false;
         @PluginBuilderAttribute("stackTraceAsArray")
@@ -387,6 +395,16 @@ public EcsLayout.Builder setEventDataset(String eventDataset) {
             return this;
         }
 
+        public EcsLayout.Builder setDataStreamDataset(String dataStreamDataset) {
+            this.dataStreamDataset = dataStreamDataset;
+            return this;
+        }
+
+        public EcsLayout.Builder setDataStreamNamespace(String dataStreamNamespace) {
+            this.dataStreamNamespace = dataStreamNamespace;
+            return this;
+        }
+
         public EcsLayout.Builder setIncludeMarkers(final boolean includeMarkers) {
             this.includeMarkers = includeMarkers;
             return this;
@@ -404,7 +422,9 @@ public EcsLayout.Builder setStackTraceAsArray(boolean stackTraceAsArray) {
 
         @Override
         public EcsLayout build() {
-            return new EcsLayout(getConfiguration(), serviceName, EcsJsonSerializer.computeEventDataset(eventDataset, serviceName), includeMarkers, additionalFields, includeOrigin, stackTraceAsArray);
+            String dataset = EcsJsonSerializer.computeDataset(dataStreamDataset != null ? dataStreamDataset : eventDataset, serviceName);
+            return new EcsLayout(getConfiguration(), serviceName, dataset, includeMarkers,
+                    additionalFields, includeOrigin, stackTraceAsArray, DataStreamFieldSanitizer.sanitizeDataStreamNamespace(dataStreamNamespace));
         }
 
         public boolean isStackTraceAsArray() {
diff --git a/log4j2-ecs-layout/src/test/java/co/elastic/logging/log4j2/Log4j2EcsLayoutTest.java b/log4j2-ecs-layout/src/test/java/co/elastic/logging/log4j2/Log4j2EcsLayoutTest.java
index c48d9d60..de96c626 100644
--- a/log4j2-ecs-layout/src/test/java/co/elastic/logging/log4j2/Log4j2EcsLayoutTest.java
+++ b/log4j2-ecs-layout/src/test/java/co/elastic/logging/log4j2/Log4j2EcsLayoutTest.java
@@ -70,6 +70,7 @@ void setUp() {
                 .setIncludeMarkers(true)
                 .setIncludeOrigin(true)
                 .setEventDataset("testdataset.log")
+                .setDataStreamNamespace("custom*namespace")
                 .setAdditionalFields(new KeyValuePair[]{
                         new KeyValuePair("cluster.uuid", "9fe9134b-20b0-465e-acf9-8cc09ac9053b"),
                         new KeyValuePair("node.id", "${node.id}"),
diff --git a/log4j2-ecs-layout/src/test/resources/log4j2-test.xml b/log4j2-ecs-layout/src/test/resources/log4j2-test.xml
index 35d2b53a..01a8a0f1 100644
--- a/log4j2-ecs-layout/src/test/resources/log4j2-test.xml
+++ b/log4j2-ecs-layout/src/test/resources/log4j2-test.xml
@@ -5,7 +5,7 @@
     </Properties>
     <Appenders>
         <List name="TestAppender">
-            <EcsLayout serviceName="test" includeMarkers="true" includeOrigin="true" eventDataset="testdataset.log">
+            <EcsLayout serviceName="test" includeMarkers="true" includeOrigin="true" eventDataset="testdataset.log" dataStreamNamespace="custom*namespace">
                 <KeyValuePair key="cluster.uuid" value="9fe9134b-20b0-465e-acf9-8cc09ac9053b"/>
                 <KeyValuePair key="node.id" value="${node.id}"/>
                 <KeyValuePair key="empty" value="${empty}"/>
diff --git a/logback-ecs-encoder/src/main/java/co/elastic/logging/logback/EcsEncoder.java b/logback-ecs-encoder/src/main/java/co/elastic/logging/logback/EcsEncoder.java
index 87a018e8..12a6a14f 100644
--- a/logback-ecs-encoder/src/main/java/co/elastic/logging/logback/EcsEncoder.java
+++ b/logback-ecs-encoder/src/main/java/co/elastic/logging/logback/EcsEncoder.java
@@ -11,9 +11,9 @@
  * the Apache License, Version 2.0 (the "License"); you may
  * not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,8 +29,9 @@
 import ch.qos.logback.classic.spi.IThrowableProxy;
 import ch.qos.logback.classic.spi.ThrowableProxy;
 import ch.qos.logback.core.encoder.EncoderBase;
-import co.elastic.logging.EcsJsonSerializer;
 import co.elastic.logging.AdditionalField;
+import co.elastic.logging.DataStreamFieldSanitizer;
+import co.elastic.logging.EcsJsonSerializer;
 import org.slf4j.Marker;
 
 import java.io.IOException;
@@ -43,13 +44,14 @@
 public class EcsEncoder extends EncoderBase<ILoggingEvent> {
 
     private static final Charset UTF_8 = Charset.forName("UTF-8");
+    private final List<AdditionalField> additionalFields = new ArrayList<AdditionalField>();
     private boolean stackTraceAsArray = false;
     private String serviceName;
-    private String eventDataset;
+    private String dataset;
+    private String dataStreamNamespace;
     private boolean includeMarkers = false;
     private ThrowableProxyConverter throwableProxyConverter;
     private boolean includeOrigin;
-    private final List<AdditionalField> additionalFields = new ArrayList<AdditionalField>();
     private OutputStream os;
 
     @Override
@@ -62,8 +64,9 @@ public void start() {
         super.start();
         throwableProxyConverter = new ThrowableProxyConverter();
         throwableProxyConverter.start();
-        eventDataset = EcsJsonSerializer.computeEventDataset(eventDataset, serviceName);
+        setDataset(EcsJsonSerializer.computeDataset(dataset, serviceName));
     }
+
     /**
      * This method has been removed in logback 1.2.
      * To make this lib backwards compatible with logback 1.1 we have implement this method.
@@ -96,7 +99,8 @@ public byte[] encode(ILoggingEvent event) {
         EcsJsonSerializer.serializeEcsVersion(builder);
         serializeMarkers(event, builder);
         EcsJsonSerializer.serializeServiceName(builder, serviceName);
-        EcsJsonSerializer.serializeEventDataset(builder, eventDataset);
+        EcsJsonSerializer.serializeDataset(builder, dataset);
+        EcsJsonSerializer.serializeNamespace(builder, dataStreamNamespace);
         EcsJsonSerializer.serializeThreadName(builder, event.getThreadName());
         EcsJsonSerializer.serializeLoggerName(builder, event.getLoggerName());
         EcsJsonSerializer.serializeAdditionalFields(builder, additionalFields);
@@ -162,8 +166,19 @@ public void addAdditionalField(AdditionalField pair) {
         this.additionalFields.add(pair);
     }
 
-    public void setEventDataset(String eventDataset) {
-        this.eventDataset = eventDataset;
+    public void setEventDataset(String dataset) {
+        setDataset(dataset);
     }
 
+    public void setDataStreamDataset(String dataset) {
+        setDataset(dataset);
+    }
+
+    private void setDataset(String dataset) {
+        this.dataset = DataStreamFieldSanitizer.sanitizeDataStreamDataset(dataset);
+    }
+
+    public void setDataStreamNamespace(String dataStreamNamespace) {
+        this.dataStreamNamespace = DataStreamFieldSanitizer.sanitizeDataStreamNamespace(dataStreamNamespace);
+    }
 }
diff --git a/logback-ecs-encoder/src/test/java/co/elastic/logging/logback/EcsEncoderTest.java b/logback-ecs-encoder/src/test/java/co/elastic/logging/logback/EcsEncoderTest.java
index 6285e53c..6436510b 100644
--- a/logback-ecs-encoder/src/test/java/co/elastic/logging/logback/EcsEncoderTest.java
+++ b/logback-ecs-encoder/src/test/java/co/elastic/logging/logback/EcsEncoderTest.java
@@ -49,6 +49,7 @@ void setUp() {
         ecsEncoder.addAdditionalField(new AdditionalField("key1", "value1"));
         ecsEncoder.addAdditionalField(new AdditionalField("key2", "value2"));
         ecsEncoder.setEventDataset("testdataset.log");
+        ecsEncoder.setDataStreamNamespace("custom*namespace");
         ecsEncoder.start();
         appender.setEncoder(ecsEncoder);
         appender.start();
diff --git a/logback-ecs-encoder/src/test/resources/logback-config.xml b/logback-ecs-encoder/src/test/resources/logback-config.xml
index bdb6f2ba..a7435e7e 100644
--- a/logback-ecs-encoder/src/test/resources/logback-config.xml
+++ b/logback-ecs-encoder/src/test/resources/logback-config.xml
@@ -7,6 +7,7 @@
             <includeOrigin>true</includeOrigin>
             <topLevelLabel>top_level</topLevelLabel>
             <eventDataset>testdataset.log</eventDataset>
+            <dataStreamNamespace>custom*namespace</dataStreamNamespace>
             <additionalField>
                 <key>key1</key>
                 <value>value1</value>